Bug 930778: Support DataChannel ACK for unordered channels r=tuexen
authorRandell Jesup <rjesup@jesup.org>
Fri, 25 Oct 2013 16:08:18 -0400
changeset 166111 86cf81278443ce7c482266123e10f1e20bcd2066
parent 166110 cee39f8dc0490c1446237c2fedcb08dedf4cb468
child 166112 dd7618acdabd7949586dc5e3b3fbb156ed817059
push id3066
push userakeybl@mozilla.com
push dateMon, 09 Dec 2013 19:58:46 +0000
treeherdermozilla-beta@a31a0dce83aa [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerstuexen
bugs930778
milestone27.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 930778: Support DataChannel ACK for unordered channels r=tuexen
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
netwerk/sctp/datachannel/DataChannelProtocol.h
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -843,19 +843,19 @@ DataChannelConnection::Connect(const cha
   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
                             DataChannelOnMessageAvailable::ON_CONNECTION,
                             this, (DataChannel *) nullptr));
   return true;
 }
 #endif
 
 DataChannel *
-DataChannelConnection::FindChannelByStream(uint16_t streamOut)
+DataChannelConnection::FindChannelByStream(uint16_t stream)
 {
-  return mStreams.SafeElementAt(streamOut);
+  return mStreams.SafeElementAt(stream);
 }
 
 uint16_t
 DataChannelConnection::FindFreeStream()
 {
   uint32_t i, j, limit;
 
   limit = mStreams.Length();
@@ -930,16 +930,27 @@ DataChannelConnection::SendControlMessag
                     SCTP_SENDV_SNDINFO, 0) < 0) {
     //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
     return (0);
   }
   return (1);
 }
 
 int32_t
+DataChannelConnection::SendOpenAckMessage(uint16_t stream)
+{
+  struct rtcweb_datachannel_ack ack;
+
+  memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
+  ack.msg_type = DATA_CHANNEL_ACK;
+
+  return SendControlMessage(&ack, sizeof(ack), stream);
+}
+
+int32_t
 DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
                                               const nsACString& protocol,
                                               uint16_t stream, bool unordered,
                                               uint16_t prPolicy, uint32_t prValue)
 {
   int label_len = label.Length(); // not including nul
   int proto_len = protocol.Length(); // not including nul
   struct rtcweb_datachannel_open_request *req =
@@ -1030,16 +1041,33 @@ DataChannelConnection::SendDeferredMessa
                                     DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
                                     channel));
         }
       }
     }
     if (still_blocked)
       break;
 
+    if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
+      if (SendOpenAckMessage(channel->mStream)) {
+        channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
+        sent = true;
+      } else {
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          still_blocked = true;
+        } else {
+          // Close the channel, inform the user
+          CloseInt(channel);
+          // XXX send error via DataChannelOnMessageAvailable (bug 843625)
+        }
+      }
+    }
+    if (still_blocked)
+      break;
+
     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
       bool failed_send = false;
       int32_t result;
 
       if (channel->mState == CLOSED || channel->mState == CLOSING) {
         channel->mBufferedData.Clear();
       }
       while (!channel->mBufferedData.IsEmpty() &&
@@ -1100,22 +1128,24 @@ DataChannelConnection::HandleOpenRequest
   nsRefPtr<DataChannel> channel;
   uint32_t prValue;
   uint16_t prPolicy;
   uint32_t flags;
 
   mLock.AssertCurrentThreadOwns();
 
   if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
-    LOG(("Inconsistent length: %u, should be %u", length,
+    LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
          (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
     if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
       return;
   }
 
+  LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
+
   switch (req->channel_type) {
     case DATA_CHANNEL_RELIABLE:
     case DATA_CHANNEL_RELIABLE_UNORDERED:
       prPolicy = SCTP_PR_SCTP_NONE;
       break;
     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
       prPolicy = SCTP_PR_SCTP_RTX;
@@ -1171,22 +1201,30 @@ DataChannelConnection::HandleOpenRequest
   LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u", __FUNCTION__,
        channel->mLabel.get(), channel->mProtocol.get(), stream));
   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
                             DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
                             this, channel));
 
   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
 
+  if (!SendOpenAckMessage(stream)) {
+    // XXX Only on EAGAIN!?  And if not, then close the channel??
+    channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
+    StartDefer();
+  }
+
   // Now process any queued data messages for the channel (which will
   // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
   // more that come in before that happens)
   DeliverQueuedData(stream);
 }
 
+// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
+// That would make this code moot.  Keep it for now for backwards compatibility.
 void
 DataChannelConnection::DeliverQueuedData(uint16_t stream)
 {
   mLock.AssertCurrentThreadOwns();
 
   uint32_t i = 0;
   while (i < mQueuedData.Length()) {
     // Careful! we may modify the array length from within the loop!
@@ -1200,16 +1238,33 @@ DataChannelConnection::DeliverQueuedData
       mQueuedData.RemoveElementAt(i);
       continue; // don't bump index since we removed the element
     }
     i++;
   }
 }
 
 void
+DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
+                                            size_t length, uint16_t stream)
+{
+  DataChannel *channel;
+
+  mLock.AssertCurrentThreadOwns();
+
+  channel = FindChannelByStream(stream);
+  NS_ENSURE_TRUE_VOID(channel);
+
+  LOG(("OpenAck received for stream %u, waiting=%d", stream,
+       (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
+
+  channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
+}
+
+void
 DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
 {
   /* XXX: Send an error message? */
   LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
   // XXX Log to JS error console if possible
 }
 
 void
@@ -1220,16 +1275,18 @@ DataChannelConnection::HandleDataMessage
   DataChannel *channel;
   const char *buffer = (const char *) data;
 
   mLock.AssertCurrentThreadOwns();
 
   channel = FindChannelByStream(stream);
 
   // XXX A closed channel may trip this... check
+  // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
+  // That would make this code moot.  Keep it for now for backwards compatibility.
   if (!channel) {
     // In the updated 0-RTT open case, the sender can send data immediately
     // after Open, and doesn't set the in-order bit (since we don't have a
     // response or ack).  Also, with external negotiation, data can come in
     // before we're told about the external negotiation.  We need to buffer
     // data until either a) Open comes in, if the ordering get messed up,
     // or b) the app tells us this channel was externally negotiated.  When
     // these occur, we deliver the data.
@@ -1313,32 +1370,38 @@ DataChannelConnection::HandleDataMessage
   }
 }
 
 // Called with mLock locked!
 void
 DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
 {
   const struct rtcweb_datachannel_open_request *req;
+  const struct rtcweb_datachannel_ack *ack;
 
   mLock.AssertCurrentThreadOwns();
 
   switch (ppid) {
     case DATA_CHANNEL_PPID_CONTROL:
-      // structure includes a possibly-unused char label[1] (in a packed structure)
-      NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
+      req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
 
-      req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
+      NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
       switch (req->msg_type) {
         case DATA_CHANNEL_OPEN_REQUEST:
-          LOG(("length %u, sizeof(*req) = %u", length, sizeof(*req)));
-          NS_ENSURE_TRUE_VOID(length >= sizeof(*req));
+          // structure includes a possibly-unused char label[1] (in a packed structure)
+          NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
 
           HandleOpenRequestMessage(req, length, stream);
           break;
+        case DATA_CHANNEL_ACK:
+          // >= sizeof(*ack) checked above
+
+          ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
+          HandleOpenAckMessage(ack, length, stream);
+          break;
         default:
           HandleUnknownMessage(ppid, length, stream);
           break;
       }
       break;
     case DATA_CHANNEL_PPID_DOMSTRING:
     case DATA_CHANNEL_PPID_DOMSTRING_LAST:
     case DATA_CHANNEL_PPID_BINARY:
@@ -1576,30 +1639,30 @@ DataChannelConnection::ClearResets()
       LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
       mStreams[channel->mStream] = nullptr;
     }
   }
   mStreamsResetting.Clear();
 }
 
 void
-DataChannelConnection::ResetOutgoingStream(uint16_t streamOut)
+DataChannelConnection::ResetOutgoingStream(uint16_t stream)
 {
   uint32_t i;
 
   mLock.AssertCurrentThreadOwns();
   LOG(("Connection %p: Resetting outgoing stream %u",
-       (void *) this, streamOut));
+       (void *) this, stream));
   // Rarely has more than a couple items and only for a short time
   for (i = 0; i < mStreamsResetting.Length(); ++i) {
-    if (mStreamsResetting[i] == streamOut) {
+    if (mStreamsResetting[i] == stream) {
       return;
     }
   }
-  mStreamsResetting.AppendElement(streamOut);
+  mStreamsResetting.AppendElement(stream);
 }
 
 void
 DataChannelConnection::SendOutgoingStreamReset()
 {
   struct sctp_reset_streams *srs;
   uint32_t i;
   size_t len;
@@ -2005,16 +2068,21 @@ DataChannelConnection::OpenFinish(alread
 
 #ifdef TEST_QUEUED_DATA
   // It's painful to write a test for this...
   channel->mState = OPEN;
   channel->mReady = true;
   SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
 #endif
 
+  if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
+    // Don't send unordered until this gets cleared
+    channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
+  }
+
   if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
     if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
                                 stream,
                                 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
                                 channel->mPrPolicy, channel->mPrValue)) {
       LOG(("SendOpenRequest failed, errno = %d", errno));
       if (errno == EAGAIN || errno == EWOULDBLOCK) {
         channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
@@ -2073,24 +2141,26 @@ DataChannelConnection::SendMsgInternal(D
 {
   uint16_t flags;
   struct sctp_sendv_spa spa;
   int32_t result;
 
   NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
   NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
 
-  flags = (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) ? SCTP_UNORDERED : 0;
-
-  // To avoid problems where an in-order OPEN_RESPONSE is lost and an
+  // To avoid problems where an in-order OPEN is lost and an
   // out-of-order data message "beats" it, require data to be in-order
   // until we get an ACK.
-  if (channel->mState == CONNECTING) {
-    flags &= ~SCTP_UNORDERED;
+  if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
+      !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
+    flags = SCTP_UNORDERED;
+  } else {
+    flags = 0;
   }
+
   spa.sendv_sndinfo.snd_ppid = htonl(ppid);
   spa.sendv_sndinfo.snd_sid = channel->mStream;
   spa.sendv_sndinfo.snd_flags = flags;
   spa.sendv_sndinfo.snd_context = 0;
   spa.sendv_sndinfo.snd_assoc_id = 0;
   spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
 
   if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -202,42 +202,45 @@ private:
   static void DTLSConnectThread(void *data);
   int SendPacket(const unsigned char* data, size_t len, bool release);
   void SctpDtlsInput(TransportFlow *flow, const unsigned char *data, size_t len);
   static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
 #endif
   DataChannel* FindChannelByStream(uint16_t stream);
   uint16_t FindFreeStream();
   bool RequestMoreStreams(int32_t aNeeded = 16);
-  int32_t SendControlMessage(void *msg, uint32_t len, uint16_t streamOut);
+  int32_t SendControlMessage(void *msg, uint32_t len, uint16_t stream);
   int32_t SendOpenRequestMessage(const nsACString& label, const nsACString& protocol,
-                                 uint16_t streamOut,
+                                 uint16_t stream,
                                  bool unordered, uint16_t prPolicy, uint32_t prValue);
+  int32_t SendOpenAckMessage(uint16_t stream);
   int32_t SendMsgInternal(DataChannel *channel, const char *data,
                           uint32_t length, uint32_t ppid);
   int32_t SendBinary(DataChannel *channel, const char *data,
                      uint32_t len, uint32_t ppid_partial, uint32_t ppid_final);
   int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
 
   void DeliverQueuedData(uint16_t stream);
 
   already_AddRefed<DataChannel> OpenFinish(already_AddRefed<DataChannel> channel) NS_WARN_UNUSED_RESULT;
 
   void StartDefer();
   bool SendDeferredMessages();
   void ProcessQueuedOpens();
   void ClearResets();
   void SendOutgoingStreamReset();
-  void ResetOutgoingStream(uint16_t streamOut);
+  void ResetOutgoingStream(uint16_t stream);
   void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
                                 size_t length,
-                                uint16_t streamIn);
-  void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn);
-  void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t streamIn);
-  void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn);
+                                uint16_t stream);
+  void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
+                            size_t length, uint16_t stream);
+  void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream);
+  void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t stream);
+  void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream);
   void HandleAssociationChangeEvent(const struct sctp_assoc_change *sac);
   void HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc);
   void HandleRemoteErrorEvent(const struct sctp_remote_error *sre);
   void HandleShutdownEvent(const struct sctp_shutdown_event *sse);
   void HandleAdaptationIndication(const struct sctp_adaptation_event *sai);
   void HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe);
   void HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst);
   void HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg);
--- a/netwerk/sctp/datachannel/DataChannelProtocol.h
+++ b/netwerk/sctp/datachannel/DataChannelProtocol.h
@@ -30,33 +30,39 @@
 #define DATA_CHANNEL_FLAGS_SEND_REQ             0x00000001
 #define DATA_CHANNEL_FLAGS_SEND_RSP             0x00000002
 #define DATA_CHANNEL_FLAGS_SEND_ACK             0x00000004
 #define DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED 0x00000008
 #define DATA_CHANNEL_FLAGS_SEND_DATA            0x00000010
 #define DATA_CHANNEL_FLAGS_FINISH_OPEN          0x00000020
 #define DATA_CHANNEL_FLAGS_FINISH_RSP           0x00000040
 #define DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED  0x00000080
+#define DATA_CHANNEL_FLAGS_WAITING_ACK          0x00000100
 
 #define INVALID_STREAM (0xFFFF)
 // max is 0xFFFF: Streams 0 to 0xFFFE = 0xFFFF streams
 #define MAX_NUM_STREAMS (2048)
 
 struct rtcweb_datachannel_open_request {
   uint8_t  msg_type; // DATA_CHANNEL_OPEN
   uint8_t  channel_type;  
   int16_t  priority;
   uint32_t reliability_param;
   uint16_t label_length;
   uint16_t protocol_length;
   char     label[1]; // (and protocol) keep VC++ happy...
 } SCTP_PACKED;
 
+struct rtcweb_datachannel_ack {
+  uint8_t  msg_type; // DATA_CHANNEL_ACK
+} SCTP_PACKED;
+
 /* msg_type values: */
-/* 0-2 were used in an early version of the protocol with 3-way handshakes */
+/* 0-1 were used in an early version of the protocol with 3-way handshakes */
+#define DATA_CHANNEL_ACK                      2
 #define DATA_CHANNEL_OPEN_REQUEST             3
 
 /* channel_type values: */
 #define DATA_CHANNEL_RELIABLE                 0x00
 #define DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT  0x01
 #define DATA_CHANNEL_PARTIAL_RELIABLE_TIMED   0x02
 
 #define DATA_CHANNEL_RELIABLE_UNORDERED                0x80