Bug 859179: Support large strings by fragmentation r=tuexen
authorRandell Jesup <rjesup@jesup.org>
Fri, 12 Apr 2013 10:08:49 -0400
changeset 128595 b023bbaffecfbf9b8447eb3a7d339c77ddf9694b
parent 128594 dd9e94fd401f468e610ea995be7035279859349b
child 128596 f41f0742351bd5480ff7a5997423fe4532f843d2
push id24532
push userryanvm@gmail.com
push dateFri, 12 Apr 2013 19:06:49 +0000
treeherdermozilla-central@2aff2d574a1e [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerstuexen
bugs859179
milestone23.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 859179: Support large strings by fragmentation r=tuexen
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
netwerk/sctp/datachannel/DataChannelProtocol.h
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -1205,47 +1205,66 @@ DataChannelConnection::HandleDataMessage
     return;
   }
 
   // XXX should this be a simple if, no warnings/debugbreaks?
   NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
 
   {
     nsAutoCString recvData(buffer, length);
+    bool is_binary = true;
+
+    if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
+        ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
+      is_binary = false;
+    }
+    if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
+      NS_WARNING("DataChannel message aborted by fragment type change!");
+      channel->mRecvBuffer.Truncate(0);
+    }
+    channel->mIsRecvBinary = is_binary;
 
     switch (ppid) {
       case DATA_CHANNEL_PPID_DOMSTRING:
-        LOG(("DataChannel: String message received of length %lu on channel %u: %.*s",
-             length, channel->mStream, (int)PR_MIN(length, 80), buffer));
+      case DATA_CHANNEL_PPID_BINARY:
+        channel->mRecvBuffer += recvData;
+        LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
+             is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
+             channel->mStream));
+        return; // Not ready to notify application
+
+      case DATA_CHANNEL_PPID_DOMSTRING_LAST:
+        LOG(("DataChannel: String message received of length %lu on channel %u",
+             length, channel->mStream));
+        if (!channel->mRecvBuffer.IsEmpty()) {
+          channel->mRecvBuffer += recvData;
+          LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
+          channel->SendOrQueue(new DataChannelOnMessageAvailable(
+                                 DataChannelOnMessageAvailable::ON_DATA, this,
+                                 channel, channel->mRecvBuffer, -1));
+          channel->mRecvBuffer.Truncate(0);
+          return;
+        }
+        // else send using recvData normally
         length = -1; // Flag for DOMString
 
         // WebSockets checks IsUTF8() here; we can try to deliver it
-
-        NS_WARN_IF_FALSE(channel->mBinaryBuffer.IsEmpty(), "Binary message aborted by text message!");
-        if (!channel->mBinaryBuffer.IsEmpty())
-          channel->mBinaryBuffer.Truncate(0);
         break;
 
-      case DATA_CHANNEL_PPID_BINARY:
-        channel->mBinaryBuffer += recvData;
-        LOG(("DataChannel: Received binary message of length %lu (total %u) on channel id %u",
-             length, channel->mBinaryBuffer.Length(), channel->mStream));
-        return; // Not ready to notify application
-
       case DATA_CHANNEL_PPID_BINARY_LAST:
         LOG(("DataChannel: Received binary message of length %lu on channel id %u",
              length, channel->mStream));
-        if (!channel->mBinaryBuffer.IsEmpty()) {
-          channel->mBinaryBuffer += recvData;
+        if (!channel->mRecvBuffer.IsEmpty()) {
+          channel->mRecvBuffer += recvData;
           LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
           channel->SendOrQueue(new DataChannelOnMessageAvailable(
                                  DataChannelOnMessageAvailable::ON_DATA, this,
-                                 channel, channel->mBinaryBuffer,
-                                 channel->mBinaryBuffer.Length()));
-          channel->mBinaryBuffer.Truncate(0);
+                                 channel, channel->mRecvBuffer,
+                                 channel->mRecvBuffer.Length()));
+          channel->mRecvBuffer.Truncate(0);
           return;
         }
         // else send using recvData normally
         break;
 
       default:
         NS_ERROR("Unknown data PPID");
         return;
@@ -1279,16 +1298,17 @@ DataChannelConnection::HandleMessage(con
           HandleOpenRequestMessage(req, length, stream);
           break;
         default:
           HandleUnknownMessage(ppid, length, stream);
           break;
       }
       break;
     case DATA_CHANNEL_PPID_DOMSTRING:
+    case DATA_CHANNEL_PPID_DOMSTRING_LAST:
     case DATA_CHANNEL_PPID_BINARY:
     case DATA_CHANNEL_PPID_BINARY_LAST:
       HandleDataMessage(ppid, buffer, length, stream);
       break;
     default:
       LOG(("Message of length %lu, PPID %u on stream %u received.",
            length, ppid, stream));
       break;
@@ -1889,17 +1909,17 @@ DataChannelConnection::OpenFinish(alread
     // OPEN and externally negotiated stream
     mStreams[stream] = channel;
   }
 
 #ifdef TEST_QUEUED_DATA
   // It's painful to write a test for this...
   channel->mState = OPEN;
   channel->mReady = true;
-  SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING);
+  SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
 #endif
 
   if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
     if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
                                 stream,
                                 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
                                 channel->mPrPolicy, channel->mPrValue)) {
       LOG(("SendOpenRequest failed, errno = %d", errno));
@@ -2002,17 +2022,18 @@ DataChannelConnection::SendMsgInternal(D
     LOG(("error %d sending string", errno));
   }
   return result;
 }
 
 // Handles fragmenting binary messages
 int32_t
 DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
-                                  uint32_t len)
+                                  uint32_t len,
+                                  uint32_t ppid_partial, uint32_t ppid_final)
 {
   // Since there's a limit on network buffer size and no limits on message
   // size, and we don't want to use EOR mode (multiple writes for a
   // message, but all other streams are blocked until you finish sending
   // this message), we need to add application-level fragmentation of large
   // messages.  On a reliable channel, these can be simply rebuilt into a
   // large message.  On an unreliable channel, we can't and don't know how
   // long to wait, and there are no retransmissions, and no easy way to
@@ -2029,33 +2050,33 @@ DataChannelConnection::SendBinary(DataCh
     int32_t sent=0;
     uint32_t origlen = len;
     LOG(("Sending binary message length %u in chunks", len));
     // XXX check flags for out-of-order, or force in-order for large binary messages
     while (len > 0) {
       uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
       uint32_t ppid;
       len -= sendlen;
-      ppid = len > 0 ? DATA_CHANNEL_PPID_BINARY : DATA_CHANNEL_PPID_BINARY_LAST;
+      ppid = len > 0 ? ppid_partial : ppid_final;
       LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
       // Note that these might end up being deferred and queued.
       sent += SendMsgInternal(channel, data, sendlen, ppid);
       data += sendlen;
     }
     LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
          (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
          origlen, sent,
          channel->mBufferedData.Length()));
     return sent;
   }
   NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
                    "Sending too-large data on unreliable channel!");
 
-  // This will fail if the message is too large
-  return SendMsgInternal(channel, data, len, DATA_CHANNEL_PPID_BINARY_LAST);
+  // This will fail if the message is too large (default 256K)
+  return SendMsgInternal(channel, data, len, ppid_final);
 }
 
 int32_t
 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
 {
   DataChannel *channel = mStreams[stream];
   NS_ENSURE_TRUE(channel, 0);
   // Spawn a thread to send the data
@@ -2086,17 +2107,18 @@ DataChannelConnection::SendBlob(uint16_t
   // sending them at big, single messages, which if large will probably not
   // get through.
 
   // XXX For now, send as one large binary message.  We should also signal
   // (via PPID) that it's a blob.
   const char *data = temp.get()->BeginReading();
   len              = temp.get()->Length();
 
-  return SendBinary(channel, data, len);
+  return SendBinary(channel, data, len,
+                    DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
 }
 
 int32_t
 DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
                                      bool isBinary)
 {
   ASSERT_WEBRTC(NS_IsMainThread());
   // We really could allow this from other threads, so long as we deal with
@@ -2108,18 +2130,20 @@ DataChannelConnection::SendMsgCommon(uin
   DataChannel *channel;
 
   LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
   // XXX if we want more efficiency, translate flags once at open time
   channel = mStreams[stream];
   NS_ENSURE_TRUE(channel, 0);
 
   if (isBinary)
-    return SendBinary(channel, data, len);
-  return SendMsgInternal(channel, data, len, DATA_CHANNEL_PPID_DOMSTRING);
+    return SendBinary(channel, data, len,
+                      DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
+  return SendBinary(channel, data, len,
+                    DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
 }
 
 void
 DataChannelConnection::Close(DataChannel *aChannel)
 {
   MutexAutoLock lock(mLock);
   CloseInt(aChannel);
 }
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -228,17 +228,17 @@ private:
   bool RequestMoreStreams(int32_t aNeeded = 16);
   int32_t SendControlMessage(void *msg, uint32_t len, uint16_t streamOut);
   int32_t SendOpenRequestMessage(const nsACString& label, const nsACString& protocol,
                                  uint16_t streamOut,
                                  bool unordered, uint16_t prPolicy, uint32_t prValue);
   int32_t SendMsgInternal(DataChannel *channel, const char *data,
                           uint32_t length, uint32_t ppid);
   int32_t SendBinary(DataChannel *channel, const char *data,
-                     uint32_t len);
+                     uint32_t len, uint32_t ppid_partial, uint32_t ppid_final);
   int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
 
   void DeliverQueuedData(uint16_t stream);
 
   already_AddRefed<DataChannel> OpenFinish(already_AddRefed<DataChannel> channel);
 
   void StartDefer();
   bool SendDeferredMessages();
@@ -336,16 +336,17 @@ public:
     , mLabel(label)
     , mProtocol(protocol)
     , mState(state)
     , mReady(false)
     , mStream(stream)
     , mPrPolicy(policy)
     , mPrValue(value)
     , mFlags(0)
+    , mIsRecvBinary(false)
     {
       NS_ASSERTION(mConnection,"NULL connection");
     }
 
   ~DataChannel();
   void Destroy(); // when we disconnect from the connection after stream RESET
 
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannel)
@@ -431,17 +432,18 @@ private:
   nsCString mProtocol;
   uint16_t mState;
   bool     mReady;
   uint16_t mStream;
   uint16_t mPrPolicy;
   uint32_t mPrValue;
   uint32_t mFlags;
   uint32_t mId;
-  nsCString mBinaryBuffer;
+  bool mIsRecvBinary;
+  nsCString mRecvBuffer;
   nsTArray<nsAutoPtr<BufferedMsg> > mBufferedData;
   nsTArray<nsCOMPtr<nsIRunnable> > mQueuedMessages;
 };
 
 // used to dispatch notifications of incoming data to the main thread
 // Patterned on CallOnMessageAvailable in WebSockets
 // Also used to proxy other items to MainThread
 class DataChannelOnMessageAvailable : public nsRunnable
--- a/netwerk/sctp/datachannel/DataChannelProtocol.h
+++ b/netwerk/sctp/datachannel/DataChannelProtocol.h
@@ -14,20 +14,21 @@
 #define SCTP_PACKED
 #else
 #error "Unsupported compiler"
 #endif
 
 // Duplicated in fsm.def
 #define WEBRTC_DATACHANNEL_STREAMS_DEFAULT 16
 
-#define DATA_CHANNEL_PPID_CONTROL   50
-#define DATA_CHANNEL_PPID_DOMSTRING 51
-#define DATA_CHANNEL_PPID_BINARY    52
-#define DATA_CHANNEL_PPID_BINARY_LAST 53
+#define DATA_CHANNEL_PPID_CONTROL        50
+#define DATA_CHANNEL_PPID_BINARY         52
+#define DATA_CHANNEL_PPID_BINARY_LAST    53
+#define DATA_CHANNEL_PPID_DOMSTRING      54
+#define DATA_CHANNEL_PPID_DOMSTRING_LAST 51
 
 #define DATA_CHANNEL_MAX_BINARY_FRAGMENT 0x4000
 
 #define DATA_CHANNEL_FLAGS_SEND_REQ             0x00000001
 #define DATA_CHANNEL_FLAGS_SEND_RSP             0x00000002
 #define DATA_CHANNEL_FLAGS_SEND_ACK             0x00000004
 #define DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED 0x00000008
 #define DATA_CHANNEL_FLAGS_SEND_DATA            0x00000010