Bug 1079729: Fix handling of increasing number of SCTP channels used by DataChannels r=tuexen a=lsblakk
authorRandell Jesup <rjesup@jesup.org>
Tue, 28 Oct 2014 11:06:00 -0400
changeset 225843 cc85ed51d280
parent 225841 74d96225a2a8
child 225845 76dcced7d838
push id4037
push userrjesup@wgate.com
push date2014-10-29 06:26 +0000
treeherdermozilla-beta@cc85ed51d280 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerstuexen, lsblakk
bugs1079729
milestone34.0
Bug 1079729: Fix handling of increasing number of SCTP channels used by DataChannels r=tuexen a=lsblakk
media/webrtc/signaling/src/sipcc/core/gsm/h/fsm.h
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannelProtocol.h
--- a/media/webrtc/signaling/src/sipcc/core/gsm/h/fsm.h
+++ b/media/webrtc/signaling/src/sipcc/core/gsm/h/fsm.h
@@ -220,17 +220,17 @@ typedef struct fsmdef_media_t_ {
      * port number used in m= data channel line
      */
     uint16_t       local_datachannel_port;
     uint16_t       remote_datachannel_port;
 
     /*
      * Data Channel properties
      */
-#define WEBRTC_DATACHANNEL_STREAMS_DEFAULT 16
+#define WEBRTC_DATACHANNEL_STREAMS_DEFAULT 256
     uint32         datachannel_streams;
     char           datachannel_protocol[SDP_MAX_STRING_LEN + 1];
 
     /*
      * This field contains the number of elements in the payloads field.
      */
     int32_t num_payloads;
 
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -909,41 +909,49 @@ DataChannelConnection::FindFreeStream()
 bool
 DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
 {
   struct sctp_status status;
   struct sctp_add_streams sas;
   uint32_t outStreamsNeeded;
   socklen_t len;
 
-  if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS)
+  if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
     aNeeded = MAX_NUM_STREAMS - mStreams.Length();
-  if (aNeeded <= 0)
+  }
+  if (aNeeded <= 0) {
     return false;
+  }
 
   len = (socklen_t)sizeof(struct sctp_status);
   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
     LOG(("***failed: getsockopt SCTP_STATUS"));
     return false;
   }
   outStreamsNeeded = aNeeded; // number to add
 
-  memset(&sas, 0, sizeof(struct sctp_add_streams));
+  // Note: if multiple channel opens happen when we don't have enough space,
+  // we'll call RequestMoreStreams() multiple times
+  memset(&sas, 0, sizeof(sas));
   sas.sas_instrms = 0;
   sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
   // Doesn't block, we get an event when it succeeds or fails
   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
                          (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
-    if (errno == EALREADY)
+    if (errno == EALREADY) {
+      LOG(("Already have %u output streams", outStreamsNeeded));
       return true;
+    }
 
     LOG(("***failed: setsockopt ADD errno=%d", errno));
     return false;
   }
   LOG(("Requested %u more streams", outStreamsNeeded));
+  // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
+  // values are larger than mStreams.Length()
   return true;
 }
 
 int32_t
 DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
 {
   struct sctp_sndinfo sndinfo;
 
@@ -1049,16 +1057,23 @@ DataChannelConnection::SendDeferredMessa
 
     // Only one of these should be set....
     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
       if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
                                  channel->mStream,
                                  channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
                                  channel->mPrPolicy, channel->mPrValue)) {
         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
+
+        channel->mState = OPEN;
+        channel->mReady = true;
+        LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
+        NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+                                  DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
+                                  channel));
         sent = true;
       } else {
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
           still_blocked = true;
         } else {
           // Close the channel, inform the user
           mStreams[channel->mStream] = nullptr;
           channel->mState = CLOSED;
@@ -1176,16 +1191,17 @@ DataChannelConnection::HandleOpenRequest
     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
       prPolicy = SCTP_PR_SCTP_RTX;
       break;
     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
       prPolicy = SCTP_PR_SCTP_TTL;
       break;
     default:
+      LOG(("Unknown channel type", req->channel_type));
       /* XXX error handling */
       return;
   }
   prValue = ntohl(req->reliability_param);
   flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
 
   if ((channel = FindChannelByStream(stream))) {
     if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
@@ -1202,16 +1218,20 @@ DataChannelConnection::HandleOpenRequest
         LOG(("WARNING: external negotiation mismatch with OpenRequest:"
              "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
              stream, prPolicy, channel->mPrPolicy,
              prValue, channel->mPrValue, flags, channel->mFlags));
       }
     }
     return;
   }
+  if (stream >= mStreams.Length()) {
+    LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
+    return;
+  }
 
   nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
   nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
                                            ntohs(req->protocol_length)));
 
   channel = new DataChannel(this,
                             stream,
                             DataChannel::CONNECTING,
@@ -1219,18 +1239,18 @@ DataChannelConnection::HandleOpenRequest
                             protocol,
                             prPolicy, prValue,
                             flags,
                             nullptr, nullptr);
   mStreams[stream] = channel;
 
   channel->mState = DataChannel::WAITING_TO_OPEN;
 
-  LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u", __FUNCTION__,
-       channel->mLabel.get(), channel->mProtocol.get(), stream));
+  LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
+       channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
                             DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
                             this, channel));
 
   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
 
   if (!SendOpenAckMessage(stream)) {
     // XXX Only on EAGAIN!?  And if not, then close the channel??
@@ -1728,23 +1748,24 @@ DataChannelConnection::HandleStreamReset
           // The other side closed the channel
           // We could be in three states:
           // 1. Normal state (input and output streams (OPEN)
           //    Notify application, send a RESET in response on our
           //    outbound channel.  Go to CLOSED
           // 2. We sent our own reset (CLOSING); either they crossed on the
           //    wire, or this is a response to our Reset.
           //    Go to CLOSED
-          // 3. We've sent a open but haven't gotten a response yet (OPENING)
+          // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
           //    I believe this is impossible, as we don't have an input stream yet.
 
           LOG(("Incoming: Channel %u  closed, state %d",
                channel->mStream, channel->mState));
           ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
                         channel->mState == DataChannel::CLOSING ||
+                        channel->mState == DataChannel::CONNECTING ||
                         channel->mState == DataChannel::WAITING_TO_OPEN);
           if (channel->mState == DataChannel::OPEN ||
               channel->mState == DataChannel::WAITING_TO_OPEN) {
             ResetOutgoingStream(channel->mStream);
             SendOutgoingStreamReset();
             NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
                                       DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
                                       channel));
@@ -1780,47 +1801,52 @@ DataChannelConnection::HandleStreamChang
     LOG(("*** Failed increasing number of streams from %u (%u/%u)",
          mStreams.Length(),
          strchg->strchange_instrms,
          strchg->strchange_outstrms));
     // XXX FIX! notify pending opens of failure
     return;
   } else {
     if (strchg->strchange_instrms > mStreams.Length()) {
-      LOG(("Other side increased streamds from %u to %u",
+      LOG(("Other side increased streams from %u to %u",
            mStreams.Length(), strchg->strchange_instrms));
     }
-    if (strchg->strchange_outstrms > mStreams.Length()) {
+    if (strchg->strchange_outstrms > mStreams.Length() ||
+        strchg->strchange_instrms > mStreams.Length()) {
       uint16_t old_len = mStreams.Length();
+      uint16_t new_len = std::max(strchg->strchange_outstrms,
+                                  strchg->strchange_instrms);
       LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
-           old_len,
-           strchg->strchange_outstrms,
-           strchg->strchange_outstrms - old_len,
+           old_len, new_len, new_len - old_len,
            strchg->strchange_instrms));
       // make sure both are the same length
-      mStreams.AppendElements(strchg->strchange_outstrms - old_len);
+      mStreams.AppendElements(new_len - old_len);
       LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
-      for (uint32_t i = old_len; i < mStreams.Length(); ++i) {
+      for (size_t i = old_len; i < mStreams.Length(); ++i) {
         mStreams[i] = nullptr;
       }
       // Re-process any channels waiting for streams.
       // Linear search, but we don't increase channels often and
       // the array would only get long in case of an app error normally
 
       // Make sure we request enough streams if there's a big jump in streams
       // Could make a more complex API for OpenXxxFinish() and avoid this loop
       int32_t num_needed = mPending.GetSize();
       LOG(("%d of %d new streams already needed", num_needed,
-           strchg->strchange_outstrms - old_len));
-      num_needed -= (strchg->strchange_outstrms - old_len); // number we added
+           new_len - old_len));
+      num_needed -= (new_len - old_len); // number we added
       if (num_needed > 0) {
         if (num_needed < 16)
           num_needed = 16;
         LOG(("Not enough new streams, asking for %d more", num_needed));
         RequestMoreStreams(num_needed);
+      } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
+        LOG(("Requesting %d output streams to match partner",
+             strchg->strchange_instrms - strchg->strchange_outstrms));
+        RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
       }
 
       ProcessQueuedOpens();
     }
     // else probably not a change in # of streams
   }
 
   for (i = 0; i < mStreams.Length(); ++i) {
--- a/netwerk/sctp/datachannel/DataChannelProtocol.h
+++ b/netwerk/sctp/datachannel/DataChannelProtocol.h
@@ -12,17 +12,17 @@
 #elif defined(_MSC_VER)
 #pragma pack (push, 1)
 #define SCTP_PACKED
 #else
 #error "Unsupported compiler"
 #endif
 
 // Duplicated in fsm.def
-#define WEBRTC_DATACHANNEL_STREAMS_DEFAULT 16
+#define WEBRTC_DATACHANNEL_STREAMS_DEFAULT 256
 
 #define DATA_CHANNEL_PPID_CONTROL        50
 #define DATA_CHANNEL_PPID_BINARY         52
 #define DATA_CHANNEL_PPID_BINARY_LAST    53
 #define DATA_CHANNEL_PPID_DOMSTRING      54
 #define DATA_CHANNEL_PPID_DOMSTRING_LAST 51
 
 #define DATA_CHANNEL_MAX_BINARY_FRAGMENT 0x4000