Bug 1556795: Allocate stream ids as soon as client/server is negotiated, and try to negotiate id limit increases after. r=ng
authorByron Campen [:bwc] <docfaraday@gmail.com>
Thu, 20 Jun 2019 20:23:57 +0000
changeset 479777 64c2df149745b3b833e39fdb178edb2edae59b78
parent 479776 947f43a8a7127494a533c66c418b7a224a0845e5
child 479778 ea730cd703a2b0ec297d7ed7014c91ac8564dd10
push id88271
push userbcampen@mozilla.com
push dateThu, 20 Jun 2019 23:15:42 +0000
treeherderautoland@fb1e149d91bf [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersng
bugs1556795
milestone69.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1556795: Allocate stream ids as soon as client/server is negotiated, and try to negotiate id limit increases after. r=ng Differential Revision: https://phabricator.services.mozilla.com/D35047
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -562,21 +562,16 @@ bool DataChannelConnection::Init(const u
     event.se_type = event_type;
     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event,
                            sizeof(event)) < 0) {
       LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
       goto error_cleanup;
     }
   }
 
-  // Update number of streams
-  mStreams.AppendElements(aNumStreams);
-  for (uint32_t i = 0; i < aNumStreams; ++i) {
-    mStreams[i] = nullptr;
-  }
   memset(&initmsg, 0, sizeof(initmsg));
   len = sizeof(initmsg);
   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
                          &len) < 0) {
     LOG(("*** failed getsockopt SCTP_INITMSG"));
     goto error_cleanup;
   }
   LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
@@ -662,30 +657,37 @@ bool DataChannelConnection::ConnectToTra
              "SCTP wasn't initialized before ConnectToTransport!");
   if (NS_WARN_IF(aTransportId.empty())) {
     return false;
   }
 
   mLocalPort = localport;
   mRemotePort = remoteport;
   mState = CONNECTING;
-
-  RUN_ON_THREAD(
-      mSTS,
-      WrapRunnable(RefPtr<DataChannelConnection>(this),
-                   &DataChannelConnection::SetSignals, aTransportId, aClient),
-      NS_DISPATCH_NORMAL);
+  mAllocateEven = Some(aClient);
+
+  // Could be faster. Probably doesn't matter.
+  while (auto channel = mChannels.Get(INVALID_STREAM)) {
+    mChannels.Remove(channel);
+    channel->mStream = FindFreeStream();
+    if (channel->mStream != INVALID_STREAM) {
+      mChannels.Insert(channel);
+    }
+  }
+
+  RUN_ON_THREAD(mSTS,
+                WrapRunnable(RefPtr<DataChannelConnection>(this),
+                             &DataChannelConnection::SetSignals, aTransportId),
+                NS_DISPATCH_NORMAL);
   return true;
 }
 
-void DataChannelConnection::SetSignals(const std::string& aTransportId,
-                                       bool aClient) {
+void DataChannelConnection::SetSignals(const std::string& aTransportId) {
   ASSERT_WEBRTC(IsSTSThread());
   mTransportId = aTransportId;
-  mAllocateEven = aClient;
   mTransportHandler->SignalPacketReceived.connect(
       this, &DataChannelConnection::SctpDtlsInput);
   // SignalStateChange() doesn't call you with the initial state
   if (mTransportHandler->GetState(mTransportId, false) ==
       TransportLayer::TS_OPEN) {
     LOG(("Setting transport signals, dtls already open"));
     CompleteConnect();
   } else {
@@ -1020,69 +1022,68 @@ bool DataChannelConnection::Connect(cons
   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
       DataChannelOnMessageAvailable::ON_CONNECTION, this,
       (DataChannel*)nullptr)));
   return true;
 }
 #endif
 
 DataChannel* DataChannelConnection::FindChannelByStream(uint16_t stream) {
-  return mStreams.SafeElementAt(stream);
+  return mChannels.Get(stream).get();
 }
 
 uint16_t DataChannelConnection::FindFreeStream() {
-  uint32_t i, j, limit;
-
-  limit = mStreams.Length();
-  if (limit > MAX_NUM_STREAMS) limit = MAX_NUM_STREAMS;
-
-  for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
-    if (!mStreams[i]) {
-      // Verify it's not still in the process of closing
-      for (j = 0; j < mStreamsResetting.Length(); ++j) {
-        if (mStreamsResetting[j] == i) {
-          break;
-        }
+  ASSERT_WEBRTC(NS_IsMainThread());
+  uint16_t i, limit;
+
+  limit = MAX_NUM_STREAMS;
+
+  MOZ_ASSERT(mAllocateEven.isSome());
+  for (i = (*mAllocateEven ? 0 : 1); i < limit; i += 2) {
+    if (mChannels.Get(i)) {
+      continue;
+    }
+
+    // Verify it's not still in the process of closing
+    size_t j;
+    for (j = 0; j < mStreamsResetting.Length(); ++j) {
+      if (mStreamsResetting[j] == i) {
+        break;
       }
-      if (j == mStreamsResetting.Length()) break;
+    }
+
+    if (j == mStreamsResetting.Length()) {
+      return i;
     }
   }
-  if (i >= limit) {
-    return INVALID_STREAM;
-  }
-  return i;
+  return INVALID_STREAM;
 }
 
 uint32_t DataChannelConnection::UpdateCurrentStreamIndex() {
-  if (mCurrentStream == mStreams.Length() - 1) {
+  RefPtr<DataChannel> channel = mChannels.GetNextChannel(mCurrentStream);
+  if (!channel) {
     mCurrentStream = 0;
   } else {
-    ++mCurrentStream;
+    mCurrentStream = channel->mStream;
   }
-
   return mCurrentStream;
 }
 
 uint32_t DataChannelConnection::GetCurrentStreamIndex() {
-  // Fix current stream index (in case #streams decreased)
-  if (mCurrentStream >= mStreams.Length()) {
-    mCurrentStream = 0;
-  }
-
   return mCurrentStream;
 }
 
 bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) {
   struct sctp_status status;
   struct sctp_add_streams sas;
   uint32_t outStreamsNeeded;
   socklen_t len;
 
-  if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
-    aNeeded = MAX_NUM_STREAMS - mStreams.Length();
+  if (aNeeded + mNegotiatedIdLimit > MAX_NUM_STREAMS) {
+    aNeeded = MAX_NUM_STREAMS - mNegotiatedIdLimit;
   }
   if (aNeeded <= 0) {
     return false;
   }
 
   len = (socklen_t)sizeof(struct sctp_status);
   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status,
                          &len) < 0) {
@@ -1103,18 +1104,18 @@ bool DataChannelConnection::RequestMoreS
       LOG(("Already have %u output streams", outStreamsNeeded));
       return true;
     }
 
     LOG(("***failed: setsockopt ADD errno=%d", errno));
     return false;
   }
   LOG(("Requested %u more streams", outStreamsNeeded));
-  // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
-  // values are larger than mStreams.Length()
+  // We add to mNegotiatedIdLimit when we get a SCTP_STREAM_CHANGE_EVENT and the
+  // values are larger than mNegotiatedIdLimit
   return true;
 }
 
 // Returns a POSIX error code.
 int DataChannelConnection::SendControlMessage(const uint8_t* data, uint32_t len,
                                               uint16_t stream) {
   struct sctp_sendv_spa info = {0};
 
@@ -1237,17 +1238,17 @@ bool DataChannelConnection::SendDeferred
     // Note: There may or may not be pending data messages
     mPendingType = PENDING_DATA;
   }
 
   bool blocked = false;
   uint32_t i = GetCurrentStreamIndex();
   uint32_t end = i;
   do {
-    channel = mStreams[i];
+    channel = mChannels.Get(i);
     // Should already be cleared if closing/closed
     if (!channel || channel->mBufferedData.IsEmpty()) {
       i = UpdateCurrentStreamIndex();
       continue;
     }
 
     // Send buffered data messages
     // Warning: This will fail in case ndata is inactive and a previously
@@ -1370,31 +1371,31 @@ void DataChannelConnection::HandleOpenRe
             ("WARNING: external negotiation mismatch with OpenRequest:"
              "channel %u, policy %u/%u, value %u/%u, ordered %d/%d",
              stream, prPolicy, channel->mPrPolicy, prValue, channel->mPrValue,
              static_cast<int>(ordered), static_cast<int>(channel->mOrdered)));
       }
     }
     return;
   }
-  if (stream >= mStreams.Length()) {
+  if (stream >= mNegotiatedIdLimit) {
     LOG(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream,
-         mStreams.Length()));
+         mNegotiatedIdLimit));
     return;
   }
 
   nsCString label(
       nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
   nsCString protocol(nsDependentCSubstring(
       &req->label[ntohs(req->label_length)], ntohs(req->protocol_length)));
 
   channel =
       new DataChannel(this, stream, DataChannel::OPEN, label, protocol,
                       prPolicy, prValue, ordered, false, nullptr, nullptr);
-  mStreams[stream] = channel;
+  mChannels.Insert(channel);
 
   LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u", __FUNCTION__,
        channel->mLabel.get(), channel->mProtocol.get(), stream));
   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
       DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, this, channel)));
 
   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__,
        channel.get()));
@@ -1984,17 +1985,19 @@ void DataChannelConnection::ClearResets(
   }
 
   for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
     RefPtr<DataChannel> channel;
     channel = FindChannelByStream(mStreamsResetting[i]);
     if (channel) {
       LOG(("Forgetting channel %u (%p) with pending reset", channel->mStream,
            channel.get()));
-      mStreams[channel->mStream] = nullptr;
+      // TODO: Do we _really_ want to remove this? Are we allowed to reuse the
+      // id?
+      mChannels.Remove(channel);
     }
   }
   mStreamsResetting.Clear();
 }
 
 void DataChannelConnection::ResetOutgoingStream(uint16_t stream) {
   uint32_t i;
 
@@ -2066,21 +2069,20 @@ void DataChannelConnection::HandleStream
           // 2. We sent our own reset (CLOSING); either they crossed on the
           //    wire, or this is a response to our Reset.
           //    Go to CLOSED
           // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
           //    I believe this is impossible, as we don't have an input stream
           //    yet.
 
           LOG(("Incoming: Channel %u  closed", channel->mStream));
-          if (mStreams[channel->mStream]) {
+          if (mChannels.Remove(channel)) {
             // Mark the stream for reset (the reset is sent below)
             ResetOutgoingStream(channel->mStream);
           }
-          mStreams[channel->mStream] = nullptr;
 
           LOG(("Disconnected DataChannel %p from connection %p",
                (void*)channel.get(), (void*)channel->mConnection.get()));
           channel->StreamClosedLocked();
         } else {
           LOG(("Can't find incoming channel %d", i));
         }
       }
@@ -2091,104 +2093,71 @@ void DataChannelConnection::HandleStream
   if (!mStreamsResetting.IsEmpty()) {
     LOG(("Sending %zu pending resets", mStreamsResetting.Length()));
     SendOutgoingStreamReset();
   }
 }
 
 void DataChannelConnection::HandleStreamChangeEvent(
     const struct sctp_stream_change_event* strchg) {
-  uint16_t stream;
-  RefPtr<DataChannel> channel;
-
   if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
     LOG(("*** Failed increasing number of streams from %zu (%u/%u)",
-         mStreams.Length(), strchg->strchange_instrms,
+         mNegotiatedIdLimit, strchg->strchange_instrms,
          strchg->strchange_outstrms));
     // XXX FIX! notify pending opens of failure
     return;
   }
-  if (strchg->strchange_instrms > mStreams.Length()) {
-    LOG(("Other side increased streams from %zu to %u", mStreams.Length(),
+  if (strchg->strchange_instrms > mNegotiatedIdLimit) {
+    LOG(("Other side increased streams from %zu to %u", mNegotiatedIdLimit,
          strchg->strchange_instrms));
   }
-  if (strchg->strchange_outstrms > mStreams.Length() ||
-      strchg->strchange_instrms > mStreams.Length()) {
-    uint16_t old_len = mStreams.Length();
-    uint16_t new_len =
-        std::max(strchg->strchange_outstrms, strchg->strchange_instrms);
+  uint16_t old_limit = mNegotiatedIdLimit;
+  uint16_t new_limit =
+      std::max(strchg->strchange_outstrms, strchg->strchange_instrms);
+  if (new_limit > mNegotiatedIdLimit) {
     LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
-         old_len, new_len, new_len - old_len, strchg->strchange_instrms));
+         old_limit, new_limit, new_limit - old_limit,
+         strchg->strchange_instrms));
     // make sure both are the same length
-    mStreams.AppendElements(new_len - old_len);
-    LOG(("New length = %zu (was %d)", mStreams.Length(), old_len));
-    for (size_t i = old_len; i < mStreams.Length(); ++i) {
-      mStreams[i] = nullptr;
-    }
+    mNegotiatedIdLimit = new_limit;
+    LOG(("New length = %zu (was %d)", mNegotiatedIdLimit, old_limit));
     // Re-process any channels waiting for streams.
     // Linear search, but we don't increase channels often and
     // the array would only get long in case of an app error normally
 
     // Make sure we request enough streams if there's a big jump in streams
     // Could make a more complex API for OpenXxxFinish() and avoid this loop
-    size_t num_needed = mPending.GetSize();
-    LOG(("%zu of %d new streams already needed", num_needed,
-         new_len - old_len));
-    num_needed -= (new_len - old_len);  // number we added
-    if (num_needed > 0) {
-      if (num_needed < 16) num_needed = 16;
-      LOG(("Not enough new streams, asking for %zu more", num_needed));
+    auto channels = mChannels.GetAll();
+    size_t num_needed =
+        channels.Length() ? (channels.LastElement()->mStream + 1) : 0;
+    MOZ_ASSERT(num_needed != INVALID_STREAM);
+    if (num_needed > new_limit) {
+      int32_t more_needed = num_needed - ((int32_t)mNegotiatedIdLimit) + 16;
+      LOG(("Not enough new streams, asking for %d more", more_needed));
       // TODO: parameter is an int32_t but we pass size_t
-      RequestMoreStreams(num_needed);
+      RequestMoreStreams(more_needed);
     } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
       LOG(("Requesting %d output streams to match partner",
            strchg->strchange_instrms - strchg->strchange_outstrms));
       RequestMoreStreams(strchg->strchange_instrms -
                          strchg->strchange_outstrms);
     }
 
     ProcessQueuedOpens();
   }
   // else probably not a change in # of streams
 
-  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
-    channel = mStreams[i];
-    if (!channel) continue;
-
-    if (channel->mStream == INVALID_STREAM) {
-      if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
-          (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
+  if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
+      (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
+    // Other side denied our request. Need to AnnounceClosed some stuff.
+    for (auto& channel : mChannels.GetAll()) {
+      if (channel->mStream >= mNegotiatedIdLimit) {
         /* XXX: Signal to the other end. */
         channel->AnnounceClosed();
         // maybe fire onError (bug 843625)
-      } else {
-        stream = FindFreeStream();
-        if (stream != INVALID_STREAM) {
-          channel->mStream = stream;
-          mStreams[stream] = channel;
-
-          // Send open request
-          int error = SendOpenRequestMessage(
-              channel->mLabel, channel->mProtocol, channel->mStream,
-              !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
-              channel->mPrPolicy, channel->mPrValue);
-          if (error) {
-            LOG(("SendOpenRequest failed, error = %d", error));
-            // Close the channel, inform the user
-            mStreams[channel->mStream] = nullptr;
-            channel->AnnounceClosed();
-            // Don't need to reset; we didn't open it
-          } else {
-            channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
-            channel->AnnounceOpen();
-          }
-        } else {
-          /* We will not find more ... */
-          break;
-        }
       }
     }
   }
 }
 
 // Called with mLock locked!
 void DataChannelConnection::HandleNotification(
     const union sctp_notification* notif, size_t n) {
@@ -2275,18 +2244,26 @@ int DataChannelConnection::ReceiveCallba
   return 1;
 }
 
 already_AddRefed<DataChannel> DataChannelConnection::Open(
     const nsACString& label, const nsACString& protocol, Type type,
     bool inOrder, uint32_t prValue, DataChannelListener* aListener,
     nsISupports* aContext, bool aExternalNegotiated, uint16_t aStream) {
   if (!aExternalNegotiated) {
-    // aStream == INVALID_STREAM to have the protocol allocate
-    aStream = INVALID_STREAM;
+    if (mAllocateEven.isSome()) {
+      aStream = FindFreeStream();
+      if (aStream == INVALID_STREAM) {
+        return nullptr;
+      }
+    } else {
+      // We do not yet know whether we are client or server, and an id has not
+      // been chosen for us. We will need to choose later.
+      aStream = INVALID_STREAM;
+    }
   }
   uint16_t prPolicy = SCTP_PR_SCTP_NONE;
 
   LOG(
       ("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, "
        "context %p, external: %s, stream %u",
        PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
        type, inOrder, prValue, aListener, aContext,
@@ -2305,40 +2282,39 @@ already_AddRefed<DataChannel> DataChanne
       LOG(("ERROR: unsupported channel type: %u", type));
       MOZ_ASSERT(false);
       return nullptr;
   }
   if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
     return nullptr;
   }
 
-  // Don't look past currently-negotiated streams
-  if (aStream != INVALID_STREAM && aStream < mStreams.Length() &&
-      mStreams[aStream]) {
+  if (aStream != INVALID_STREAM && mChannels.Get(aStream)) {
     LOG(("ERROR: external negotiation of already-open channel %u", aStream));
     // XXX How do we indicate this up to the application?  Probably the
     // caller's job, but we may need to return an error code.
     return nullptr;
   }
 
   RefPtr<DataChannel> channel(new DataChannel(
       this, aStream, DataChannel::CONNECTING, label, protocol, prPolicy,
       prValue, inOrder, aExternalNegotiated, aListener, aContext));
+  mChannels.Insert(channel);
 
   MutexAutoLock lock(mLock);  // OpenFinish assumes this
   return OpenFinish(channel.forget());
 }
 
 // Separate routine so we can also call it to finish up from pending opens
 already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
     already_AddRefed<DataChannel>&& aChannel) {
   RefPtr<DataChannel> channel(aChannel);  // takes the reference passed in
   // Normally 1 reference if called from ::Open(), or 2 if called from
   // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
-  uint16_t stream = channel->mStream;
+  const uint16_t stream = channel->mStream;
   bool queue = false;
 
   mLock.AssertCurrentThreadOwns();
 
   // Cases we care about:
   // Pre-negotiated:
   //    Not Open:
   //      Doesn't fit:
@@ -2358,62 +2334,53 @@ already_AddRefed<DataChannel> DataChanne
   //         -> RequestMoreStreams && queue
   //      Does fit:
   //         -> open
   // So the Open cases are basically the same
   // Not Open cases are simply queue for non-negotiated, and
   // either change the initial ask or possibly renegotiate after open.
 
   if (mState == OPEN) {
-    if (stream == INVALID_STREAM) {
-      stream = FindFreeStream();  // may be INVALID_STREAM if we need more
-    }
-    if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
+    MOZ_ASSERT(stream != INVALID_STREAM);
+    if (stream >= mNegotiatedIdLimit) {
       // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra
       // streams to avoid going back immediately for more if the ask to N, N+1,
       // etc
-      int32_t more_needed = (stream == INVALID_STREAM)
-                                ? 16
-                                : (stream - ((int32_t)mStreams.Length())) + 16;
+      int32_t more_needed = stream - ((int32_t)mNegotiatedIdLimit) + 16;
       if (!RequestMoreStreams(more_needed)) {
         // Something bad happened... we're done
         goto request_error_cleanup;
       }
       queue = true;
     }
   } else {
     // not OPEN
-    if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
+    if (stream != INVALID_STREAM && stream >= mNegotiatedIdLimit &&
         mState == CLOSED) {
       // Update number of streams for init message
       struct sctp_initmsg initmsg;
       socklen_t len = sizeof(initmsg);
-      int32_t total_needed = stream + 16;
+      uint16_t total_needed =
+          (stream < UINT16_MAX - 16) ? stream + 16 : UINT16_MAX;
 
       memset(&initmsg, 0, sizeof(initmsg));
       if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG,
                              &initmsg, &len) < 0) {
         LOG(("*** failed getsockopt SCTP_INITMSG"));
         goto request_error_cleanup;
       }
       LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
            initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
       initmsg.sinit_num_ostreams = total_needed;
       initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
       if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG,
                              &initmsg, (socklen_t)sizeof(initmsg)) < 0) {
         LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
         goto request_error_cleanup;
       }
-
-      int32_t old_len = mStreams.Length();
-      mStreams.AppendElements(total_needed - old_len);
-      for (int32_t i = old_len; i < total_needed; ++i) {
-        mStreams[i] = nullptr;
-      }
     }
     // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
     // is called, if needed
     queue = true;
   }
   if (queue) {
     LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
     // Also serves to mark we told the app
@@ -2421,19 +2388,17 @@ already_AddRefed<DataChannel> DataChanne
     // we need a ref for the nsDeQue and one to return
     DataChannel* rawChannel = channel;
     rawChannel->AddRef();
     mPending.Push(rawChannel);
     return channel.forget();
   }
 
   MOZ_ASSERT(stream != INVALID_STREAM);
-  // just allocated (& OPEN), or externally negotiated
-  mStreams[stream] = channel;  // holds a reference
-  channel->mStream = stream;
+  MOZ_ASSERT(stream < mNegotiatedIdLimit);
 
 #ifdef TEST_QUEUED_DATA
   // It's painful to write a test for this...
   channel->AnnounceOpen();
   channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
   SendDataMsgInternalOrBuffer(channel, "Help me!", 8,
                               DATA_CHANNEL_PPID_DOMSTRING);
 #endif
@@ -2452,18 +2417,17 @@ already_AddRefed<DataChannel> DataChanne
       LOG(("SendOpenRequest failed, error = %d", error));
       if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
         // We already returned the channel to the app.
         NS_ERROR("Failed to send open request");
         channel->AnnounceClosed();
       }
       // If we haven't returned the channel yet, it will get destroyed when we
       // exit this function.
-      mStreams[stream] = nullptr;
-      channel->mStream = INVALID_STREAM;
+      mChannels.Remove(channel);
       // we'll be destroying the channel
       return nullptr;
       /* NOTREACHED */
     }
   }
 
   // Either externally negotiated or we sent Open
   channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
@@ -2738,17 +2702,17 @@ class ReadBlobRunnable : public Runnable
   RefPtr<DataChannelConnection> mConnection;
   uint16_t mStream;
   // Use RefCount for preventing the object is deleted when SendBlob returns.
   RefPtr<nsIInputStream> mBlob;
 };
 
 // Returns a POSIX error code.
 int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream* aBlob) {
-  DataChannel* channel = mStreams[stream];
+  RefPtr<DataChannel> channel = mChannels.Get(stream);
   if (NS_WARN_IF(!channel)) {
     return EINVAL;  // TODO: Find a better error code
   }
 
   // Spawn a thread to send the data
   if (!mInternalIOThread) {
     nsresult rv =
         NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread));
@@ -2824,47 +2788,37 @@ void DataChannelConnection::ReadBlob(
     NS_ReleaseOnMainThreadSystemGroup("DataChannelBlobSendRunnable",
                                       runnable.forget());
     return;
   }
   aBlob->Close();
   Dispatch(runnable.forget());
 }
 
-void DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList) {
-  ASSERT_WEBRTC(NS_IsMainThread());
-  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
-    if (mStreams[i]) {
-      aStreamList->push_back(mStreams[i]->mStream);
-    }
-  }
-}
-
 // Returns a POSIX error code.
 int DataChannelConnection::SendDataMsgCommon(uint16_t stream,
                                              const nsACString& aMsg,
                                              bool isBinary) {
   ASSERT_WEBRTC(NS_IsMainThread());
   // We really could allow this from other threads, so long as we deal with
   // asynchronosity issues with channels closing, in particular access to
-  // mStreams, and issues with the association closing (access to mSocket).
+  // mChannels, and issues with the association closing (access to mSocket).
 
   const uint8_t* data = (const uint8_t*)aMsg.BeginReading();
   uint32_t len = aMsg.Length();
 #if (UINT32_MAX > SIZE_MAX)
   if (len > SIZE_MAX) {
     return EMSGSIZE;
   }
 #endif
-  DataChannel* channelPtr;
 
   LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream,
        len));
   // XXX if we want more efficiency, translate flags once at open time
-  channelPtr = mStreams[stream];
+  RefPtr<DataChannel> channelPtr = mChannels.Get(stream);
   if (NS_WARN_IF(!channelPtr)) {
     return EINVAL;  // TODO: Find a better error code
   }
 
   auto& channel = *channelPtr;
 
   if (isBinary) {
     return SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_BINARY_PARTIAL,
@@ -2889,34 +2843,34 @@ void DataChannelConnection::Close(DataCh
 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
 void DataChannelConnection::CloseInt(DataChannel* aChannel) {
   MOZ_ASSERT(aChannel);
   RefPtr<DataChannel> channel(aChannel);  // make sure it doesn't go away on us
 
   mLock.AssertCurrentThreadOwns();
   LOG(("Connection %p/Channel %p: Closing stream %u",
        channel->mConnection.get(), channel.get(), channel->mStream));
+
+  aChannel->mBufferedData.Clear();
+  if (mState == CLOSED) {
+    // If we're CLOSING, we might leave this in place until we can send a
+    // reset.
+    mChannels.Remove(channel);
+  }
+
   // re-test since it may have closed before the lock was grabbed
   if (aChannel->mReadyState == CLOSED || aChannel->mReadyState == CLOSING) {
     LOG(("Channel already closing/closed (%u)", aChannel->mReadyState));
-    if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
-      // called from CloseAll()
-      // we're not going to hang around waiting any more
-      mStreams[channel->mStream] = nullptr;
-    }
     return;
   }
-  aChannel->mBufferedData.Clear();
+
   if (channel->mStream != INVALID_STREAM) {
     ResetOutgoingStream(channel->mStream);
-    if (mState == CLOSED) {  // called from CloseAll()
-      // Let resets accumulate then send all at once in CloseAll()
-      // we're not going to hang around waiting
-      mStreams[channel->mStream] = nullptr;
-    } else {
+    if (mState != CLOSED) {
+      // Individual channel is being closed, send reset now.
       SendOutgoingStreamReset();
     }
   }
   aChannel->mReadyState = CLOSING;
   if (mState == CLOSED) {
     // we're not going to hang around waiting
     channel->StreamClosedLocked();
   }
@@ -2932,39 +2886,99 @@ void DataChannelConnection::CloseAll() {
   {
     MutexAutoLock lock(mLock);
     mState = CLOSED;
   }
 
   // Close current channels
   // If there are runnables, they hold a strong ref and keep the channel
   // and/or connection alive (even if in a CLOSED state)
-  bool closed_some = false;
-  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
-    if (mStreams[i]) {
-      mStreams[i]->Close();
-      closed_some = true;
-    }
+  for (auto& channel : mChannels.GetAll()) {
+    channel->Close();
   }
 
   // Clean up any pending opens for channels
   RefPtr<DataChannel> channel;
   while (nullptr != (channel = dont_AddRef(
                          static_cast<DataChannel*>(mPending.PopFront())))) {
     LOG(("closing pending channel %p, stream %u", channel.get(),
          channel->mStream));
     channel->Close();  // also releases the ref on each iteration
-    closed_some = true;
   }
   // It's more efficient to let the Resets queue in shutdown and then
   // SendOutgoingStreamReset() here.
-  if (closed_some) {
-    MutexAutoLock lock(mLock);
-    SendOutgoingStreamReset();
+  MutexAutoLock lock(mLock);
+  SendOutgoingStreamReset();
+}
+
+bool DataChannelConnection::Channels::IdComparator::Equals(
+    const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
+  return aChannel->mStream == aId;
+}
+
+bool DataChannelConnection::Channels::IdComparator::LessThan(
+    const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
+  return aChannel->mStream < aId;
+}
+
+bool DataChannelConnection::Channels::IdComparator::Equals(
+    const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
+  return Equals(a1, a2->mStream);
+}
+
+bool DataChannelConnection::Channels::IdComparator::LessThan(
+    const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
+  return LessThan(a1, a2->mStream);
+}
+
+void DataChannelConnection::Channels::Insert(
+    const RefPtr<DataChannel>& aChannel) {
+  LOG(("Inserting channel %u : %p", aChannel->mStream, aChannel.get()));
+  MutexAutoLock lock(mMutex);
+  if (aChannel->mStream != INVALID_STREAM) {
+    MOZ_ASSERT(!mChannels.ContainsSorted(aChannel, IdComparator()));
   }
+
+  MOZ_ASSERT(!mChannels.Contains(aChannel));
+
+  mChannels.InsertElementSorted(aChannel, IdComparator());
+}
+
+bool DataChannelConnection::Channels::Remove(
+    const RefPtr<DataChannel>& aChannel) {
+  LOG(("Removing channel %u : %p", aChannel->mStream, aChannel.get()));
+  MutexAutoLock lock(mMutex);
+  if (aChannel->mStream == INVALID_STREAM) {
+    return mChannels.RemoveElement(aChannel);
+  }
+
+  return mChannels.RemoveElementSorted(aChannel, IdComparator());
+}
+
+RefPtr<DataChannel> DataChannelConnection::Channels::Get(uint16_t aId) const {
+  MutexAutoLock lock(mMutex);
+  auto index = mChannels.BinaryIndexOf(aId, IdComparator());
+  if (index == ChannelArray::NoIndex) {
+    return nullptr;
+  }
+  return mChannels[index];
+}
+
+RefPtr<DataChannel> DataChannelConnection::Channels::GetNextChannel(
+    uint16_t aCurrentId) const {
+  MutexAutoLock lock(mMutex);
+  if (mChannels.IsEmpty()) {
+    return nullptr;
+  }
+
+  auto index = mChannels.IndexOfFirstElementGt(aCurrentId, IdComparator());
+  if (index == mChannels.Length()) {
+    index = 0;
+  }
+  return mChannels[index];
 }
 
 DataChannel::~DataChannel() {
   // NS_ASSERTION since this is more "I think I caught all the cases that
   // can cause this" than a true kill-the-program assertion.  If this is
   // wrong, nothing bad happens.  A worst it's a leak.
   NS_ASSERTION(mReadyState == CLOSED || mReadyState == CLOSING,
                "unexpected state in ~DataChannel");
@@ -2981,18 +2995,16 @@ void DataChannel::Close() {
 // Used when disconnecting from the DataChannelConnection
 void DataChannel::StreamClosedLocked() {
   mConnection->mLock.AssertCurrentThreadOwns();
   ENSURE_DATACONNECTION;
 
   LOG(("Destroying Data channel %u", mStream));
   MOZ_ASSERT_IF(mStream != INVALID_STREAM,
                 !mConnection->FindChannelByStream(mStream));
-  // Spec doesn't say to mess with the stream id...
-  mStream = INVALID_STREAM;
   AnnounceClosed();
   // We leave mConnection live until the DOM releases us, to avoid races
 }
 
 void DataChannel::ReleaseConnection() {
   ASSERT_WEBRTC(NS_IsMainThread());
   mConnection = nullptr;
 }
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -159,17 +159,17 @@ class DataChannelConnection final : publ
 #endif
 
 #ifdef SCTP_DTLS_SUPPORTED
   bool ConnectToTransport(const std::string& aTransportId, bool aClient,
                           uint16_t localport, uint16_t remoteport);
   void TransportStateChange(const std::string& aTransportId,
                             TransportLayer::State aState);
   void CompleteConnect();
-  void SetSignals(const std::string& aTransportId, bool aClient);
+  void SetSignals(const std::string& aTransportId);
 #endif
 
   typedef enum {
     RELIABLE = 0,
     PARTIAL_RELIABLE_REXMIT = 1,
     PARTIAL_RELIABLE_TIMED = 2
   } Type;
 
@@ -211,18 +211,16 @@ class DataChannelConnection final : publ
   }
 
   friend class DataChannel;
   Mutex mLock;
 
   void ReadBlob(already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
                 nsIInputStream* aBlob);
 
-  void GetStreamIds(std::vector<uint16_t>* aStreamList);
-
   bool SendDeferredMessages();
 
  protected:
   friend class DataChannelOnMessageAvailable;
   // Avoid cycles with PeerConnectionImpl
   // Use from main thread only as WeakPtr is not threadsafe
   WeakPtr<DataConnectionListener> mListener;
 
@@ -305,35 +303,66 @@ class DataChannelConnection final : publ
     bool on = false;
     if (mSTS) {
       mSTS->IsOnCurrentThread(&on);
     }
     return on;
   }
 #endif
 
+  class Channels {
+   public:
+    Channels() : mMutex("DataChannelConnection::Channels::mMutex") {}
+    void Insert(const RefPtr<DataChannel>& aChannel);
+    bool Remove(const RefPtr<DataChannel>& aChannel);
+    RefPtr<DataChannel> Get(uint16_t aId) const;
+    typedef AutoTArray<RefPtr<DataChannel>, 16> ChannelArray;
+    ChannelArray GetAll() const {
+      MutexAutoLock lock(mMutex);
+      return mChannels;
+    }
+    RefPtr<DataChannel> GetNextChannel(uint16_t aCurrentId) const;
+
+   private:
+    struct IdComparator {
+      bool Equals(const RefPtr<DataChannel>& aChannel, uint16_t aId) const;
+      bool LessThan(const RefPtr<DataChannel>& aChannel, uint16_t aId) const;
+      bool Equals(const RefPtr<DataChannel>& a1,
+                  const RefPtr<DataChannel>& a2) const;
+      bool LessThan(const RefPtr<DataChannel>& a1,
+                    const RefPtr<DataChannel>& a2) const;
+    };
+    mutable Mutex mMutex;
+    ChannelArray mChannels;
+  };
+
   bool mSendInterleaved = false;
   bool mMaxMessageSizeSet = false;
   uint64_t mMaxMessageSize = 0;
-  bool mAllocateEven = false;
+  // Main thread only
+  Maybe<bool> mAllocateEven;
   // Data:
-  // NOTE: while this array will auto-expand, increases in the number of
+  // NOTE: while this container will auto-expand, increases in the number of
   // channels available from the stack must be negotiated!
-  AutoTArray<RefPtr<DataChannel>, 16> mStreams;
+  // Accessed from both main and sts, API is threadsafe
+  Channels mChannels;
+  // STS only
   uint32_t mCurrentStream = 0;
   nsDeque mPending;  // Holds addref'ed DataChannel's -- careful!
+  // STS and main
+  size_t mNegotiatedIdLimit = 0;  // GUARDED_BY(mConnection->mLock)
   uint8_t mPendingType = PENDING_NONE;
   // holds data that's come in before a channel is open
   nsTArray<nsAutoPtr<QueuedDataMessage>> mQueuedData;
   // holds outgoing control messages
   nsTArray<nsAutoPtr<BufferedOutgoingMsg>>
       mBufferedControl;  // GUARDED_BY(mConnection->mLock)
 
-  // Streams pending reset
-  AutoTArray<uint16_t, 4> mStreamsResetting;
+  // Streams pending reset. Accessed from main and STS.
+  AutoTArray<uint16_t, 4> mStreamsResetting;  // GUARDED_BY(mConnection->mLock)
   // accessed from STS thread
   struct socket* mMasterSocket = nullptr;
   // cloned from mMasterSocket on successful Connect on STS thread
   struct socket* mSocket = nullptr;
   uint16_t mState = CLOSED;  // Protected with mLock
 
 #ifdef SCTP_DTLS_SUPPORTED
   std::string mTransportId;
@@ -488,16 +517,17 @@ class DataChannel {
   RefPtr<DataChannelConnection> mConnection;
   nsCString mLabel;
   nsCString mProtocol;
   // This is mainthread only
   uint16_t mReadyState;
   uint16_t mStream;
   uint16_t mPrPolicy;
   uint32_t mPrValue;
+  // Accessed on main and STS
   const bool mNegotiated;
   const bool mOrdered;
   uint32_t mFlags;
   uint32_t mId;
   bool mIsRecvBinary;
   size_t mBufferedThreshold;
   // Read/written on main only. Decremented via message-passing, because the
   // spec requires us to queue a task for this.