Bug 1240209: use buffer-half-empty callbacks to send buffered data instead of a timer r=tuexen,drno
authorRandell Jesup <rjesup@jesup.org>
Tue, 31 May 2016 23:45:17 -0400
changeset 324333 fa2962ff448e58356955538a876787e411d7b18f
parent 324332 68eb3a876ea584571f47f465c9916970e050b343
child 324334 d7246e5a52b1f3e6f5f3a51fce569dec1072fe63
push id9671
push userraliiev@mozilla.com
push dateMon, 06 Jun 2016 20:27:52 +0000
treeherdermozilla-aurora@cea65ca3d0bd [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerstuexen, drno
bugs1240209
milestone49.0a1
Bug 1240209: use buffer-half-empty callbacks to send buffered data instead of a timer r=tuexen,drno
netwerk/sctp/datachannel/DataChannel.cpp
netwerk/sctp/datachannel/DataChannel.h
--- a/netwerk/sctp/datachannel/DataChannel.cpp
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -143,16 +143,52 @@ static int
 receive_cb(struct socket* sock, union sctp_sockstore addr,
            void *data, size_t datalen,
            struct sctp_rcvinfo rcv, int flags, void *ulp_info)
 {
   DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
   return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
 }
 
+static
+DataChannelConnection *
+GetConnectionFromSocket(struct socket* sock)
+{
+  struct sockaddr *addrs = nullptr;
+  int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
+  if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
+    return nullptr;
+  }
+  // usrsctp_getladdrs() returns the addresses bound to this socket, which
+  // contains the SctpDataMediaChannel* as sconn_addr.  Read the pointer,
+  // then free the list of addresses once we have the pointer.  We only open
+  // AF_CONN sockets, and they should all have the sconn_addr set to the
+  // pointer that created them, so [0] is as good as any other.
+  struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
+  DataChannelConnection *connection =
+    reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
+  usrsctp_freeladdrs(addrs);
+
+  return connection;
+}
+
+// called when the buffer empties to the threshold value
+static int
+threshold_event(struct socket* sock, uint32_t sb_free)
+{
+  DataChannelConnection *connection = GetConnectionFromSocket(sock);
+  if (connection) {
+    LOG(("SendDeferred()"));
+    connection->SendDeferredMessages();
+  } else {
+    LOG(("Can't find connection for socket %p", sock));
+  }
+  return 0;
+}
+
 static void
 debug_printf(const char *format, ...)
 {
   va_list ap;
   char buffer[1024];
 
   if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
     va_start(ap, format);
@@ -171,18 +207,16 @@ DataChannelConnection::DataChannelConnec
    mLock("netwerk::sctp::DataChannelConnection")
 {
   mState = CLOSED;
   mSocket = nullptr;
   mMasterSocket = nullptr;
   mListener = listener;
   mLocalPort = 0;
   mRemotePort = 0;
-  mDeferTimeout = 10;
-  mTimerRunning = false;
   LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
   mInternalIOThread = nullptr;
 }
 
 DataChannelConnection::~DataChannelConnection()
 {
   LOG(("Deleting DataChannelConnection %p", (void *) this));
   // This may die on the MainThread, or on the STS thread
@@ -262,19 +296,16 @@ void DataChannelConnection::DestroyOnSTS
   if (aSocket && aSocket != aMasterSocket)
     usrsctp_close(aSocket);
   if (aMasterSocket)
     usrsctp_close(aMasterSocket);
 
   disconnect_all();
 }
 
-NS_IMPL_ISUPPORTS(DataChannelConnection,
-                  nsITimerCallback)
-
 bool
 DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
 {
   struct sctp_initmsg initmsg;
   struct sctp_udpencaps encaps;
   struct sctp_assoc_value av;
   struct sctp_event event;
   socklen_t len;
@@ -329,17 +360,18 @@ DataChannelConnection::Init(unsigned sho
   // Find the STS thread
   nsresult rv;
   mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
   MOZ_ASSERT(NS_SUCCEEDED(rv));
 
   // Open sctp with a callback
   if ((mMasterSocket = usrsctp_socket(
          aUsingDtls ? AF_CONN : AF_INET,
-         SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) {
+         SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
+         usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
     return false;
   }
 
   // Make non-blocking for bind/connect.  SCTP over UDP defaults to non-blocking
   // in associations for normal IO
   if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
     LOG(("Couldn't set non_blocking on SCTP socket"));
     // We can't handle connect() safely if it will block, not that this will
@@ -440,70 +472,16 @@ DataChannelConnection::Init(unsigned sho
 
 error_cleanup:
   usrsctp_close(mMasterSocket);
   mMasterSocket = nullptr;
   mUsingDtls = false;
   return false;
 }
 
-void
-DataChannelConnection::StartDefer()
-{
-  nsresult rv;
-  if (!NS_IsMainThread()) {
-    NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
-                                        DataChannelOnMessageAvailable::START_DEFER,
-                                        this, (DataChannel *) nullptr)));
-    return;
-  }
-
-  ASSERT_WEBRTC(NS_IsMainThread());
-  if (!mDeferredTimer) {
-    mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
-    MOZ_ASSERT(mDeferredTimer);
-  }
-
-  if (!mTimerRunning) {
-    rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
-                                          nsITimer::TYPE_ONE_SHOT);
-    NS_ENSURE_TRUE_VOID(rv == NS_OK);
-
-    mTimerRunning = true;
-  }
-}
-
-// nsITimerCallback
-
-NS_IMETHODIMP
-DataChannelConnection::Notify(nsITimer *timer)
-{
-  ASSERT_WEBRTC(NS_IsMainThread());
-  LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout));
-
-  if (timer == mDeferredTimer) {
-    if (SendDeferredMessages()) {
-      // Still blocked
-      // we don't need a lock, since this must be main thread...
-      nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
-                                                     nsITimer::TYPE_ONE_SHOT);
-      if (NS_FAILED(rv)) {
-        LOG(("%s: cannot initialize open timer", __FUNCTION__));
-        // XXX and do....?
-        return rv;
-      }
-      mTimerRunning = true;
-    } else {
-      LOG(("Turned off deferred send timer"));
-      mTimerRunning = false;
-    }
-  }
-  return NS_OK;
-}
-
 #ifdef MOZ_PEERCONNECTION
 void
 DataChannelConnection::SetEvenOdd()
 {
   ASSERT_WEBRTC(IsSTSThread());
 
   TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
       mTransportFlow->GetLayer(TransportLayerDtls::ID()));
@@ -1025,17 +1003,16 @@ DataChannelConnection::SendOpenRequestMe
 
 // returns if we're still blocked or not
 bool
 DataChannelConnection::SendDeferredMessages()
 {
   uint32_t i;
   RefPtr<DataChannel> channel; // we may null out the refs to this
   bool still_blocked = false;
-  bool sent = false;
 
   // This may block while something is modifying channels, but should not block for IO
   MutexAutoLock lock(mLock);
 
   // XXX For total fairness, on a still_blocked we'd start next time at the
   // same index.  Sorry, not going to bother for now.
   for (i = 0; i < mStreams.Length(); ++i) {
     channel = mStreams[i];
@@ -1051,17 +1028,16 @@ DataChannelConnection::SendDeferredMessa
         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
 
         channel->mState = OPEN;
         channel->mReady = true;
         LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
         NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
                                   DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
                                   channel)));
-        sent = true;
       } else {
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
           still_blocked = true;
         } else {
           // Close the channel, inform the user
           mStreams[channel->mStream] = nullptr;
           channel->mState = CLOSED;
           // Don't need to reset; we didn't open it
@@ -1072,17 +1048,16 @@ DataChannelConnection::SendDeferredMessa
       }
     }
     if (still_blocked)
       break;
 
     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
       if (SendOpenAckMessage(channel->mStream)) {
         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
-        sent = true;
       } else {
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
           still_blocked = true;
         } else {
           // Close the channel, inform the user
           CloseInt(channel);
           // XXX send error via DataChannelOnMessageAvailable (bug 843625)
         }
@@ -1094,17 +1069,17 @@ DataChannelConnection::SendDeferredMessa
     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
       bool failed_send = false;
       int32_t result;
 
       if (channel->mState == CLOSED || channel->mState == CLOSING) {
         channel->mBufferedData.Clear();
       }
 
-      uint32_t buffered_amount = channel->GetBufferedAmount();
+      uint32_t buffered_amount = channel->GetBufferedAmountLocked();
       uint32_t threshold = channel->GetBufferedAmountLowThreshold();
       bool was_over_threshold = buffered_amount >= threshold;
 
       while (!channel->mBufferedData.IsEmpty() &&
              !failed_send) {
         struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
         const char *data           = channel->mBufferedData[0]->mData;
         size_t len                 = channel->mBufferedData[0]->mLength;
@@ -1121,17 +1096,16 @@ DataChannelConnection::SendDeferredMessa
             failed_send = true;
             LOG(("queue full again when resending %d bytes (%d)", len, result));
           } else {
             LOG(("error %d re-sending string", errno));
             failed_send = true;
           }
         } else {
           LOG(("Resent buffer of %d bytes (%d)", len, result));
-          sent = true;
           // In theory this could underflow if >4GB was buffered and re
           // truncated in GetBufferedAmount(), but this won't cause any problems.
           buffered_amount -= channel->mBufferedData[0]->mLength;
           channel->mBufferedData.RemoveElementAt(0);
           // can never fire with default threshold of 0
           if (was_over_threshold && buffered_amount < threshold) {
             LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
                  channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
@@ -1155,28 +1129,17 @@ DataChannelConnection::SendDeferredMessa
         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
       else
         still_blocked = true;
     }
     if (still_blocked)
       break;
   }
 
-  if (!still_blocked) {
-    // mDeferTimeout becomes an estimate of how long we need to wait next time we block
-    return false;
-  }
-  // adjust time?  More time for next wait if we didn't send anything, less if did
-  // Pretty crude, but better than nothing; just to keep CPU use down
-  if (!sent && mDeferTimeout < 50)
-    mDeferTimeout++;
-  else if (sent && mDeferTimeout > 10)
-    mDeferTimeout--;
-
-  return true;
+  return still_blocked;
 }
 
 void
 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
                                                 size_t length,
                                                 uint16_t stream)
 {
   RefPtr<DataChannel> channel;
@@ -1263,17 +1226,18 @@ DataChannelConnection::HandleOpenRequest
                             DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
                             this, channel)));
 
   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
 
   if (!SendOpenAckMessage(stream)) {
     // XXX Only on EAGAIN!?  And if not, then close the channel??
     channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
-    StartDefer();
+    // Note: we're locked, so there's no danger of a race with the
+    // buffer-threshold callback
   }
 
   // Now process any queued data messages for the channel (which will
   // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
   // more that come in before that happens)
   DeliverQueuedData(stream);
 }
 
@@ -1877,18 +1841,18 @@ DataChannelConnection::HandleStreamChang
                                   channel)));
         // maybe fire onError (bug 843625)
       } else {
         stream = FindFreeStream();
         if (stream != INVALID_STREAM) {
           channel->mStream = stream;
           mStreams[stream] = channel;
           channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
-          /// XXX fix
-          StartDefer();
+          // Note: we're locked, so there's no danger of a race with the
+          // buffer-threshold callback
         } else {
           /* We will not find more ... */
           break;
         }
       }
     }
   }
 }
@@ -2142,18 +2106,18 @@ DataChannelConnection::OpenFinish(alread
   if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
     if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
                                 stream,
                                 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
                                 channel->mPrPolicy, channel->mPrValue)) {
       LOG(("SendOpenRequest failed, errno = %d", errno));
       if (errno == EAGAIN || errno == EWOULDBLOCK) {
         channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
-        StartDefer();
-
+        // Note: we're locked, so there's no danger of a race with the
+        // buffer-threshold callback
         return channel.forget();
       } else {
         if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
           // We already returned the channel to the app.
           NS_ERROR("Failed to send open request");
           NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
                                     DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
                                     channel)));
@@ -2232,35 +2196,48 @@ DataChannelConnection::SendMsgInternal(D
 
   // Note: Main-thread IO, but doesn't block!
   // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
   // (more than the buffersize) queue data onto another thread to do the
   // actual sends.  See netwerk/protocol/websocket/WebSocketChannel.cpp
 
   // SCTP will return EMSGSIZE if the message is bigger than the buffer
   // size (or EAGAIN if there isn't space)
+
+  // Avoid a race between buffer-full-failure (where we have to add the
+  // packet to the buffered-data queue) and the buffer-now-only-half-full
+  // callback, which happens on a different thread.  Otherwise we might
+  // fail here, then before we add it to the queue get the half-full
+  // callback, find nothing to do, then on this thread add it to the
+  // queue - which would sit there.  Also, if we later send more data, it
+  // would arrive ahead of the buffered message, but if the buffer ever
+  // got to 1/2 full, the message would get sent - but at a semi-random
+  // time, after other data it was supposed to be in front of.
+
+  // Must lock before empty check for similar reasons!
+  MutexAutoLock lock(mLock);
   if (channel->mBufferedData.IsEmpty()) {
     result = usrsctp_sendv(mSocket, data, length,
                            nullptr, 0,
                            (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
                            SCTP_SENDV_SPA, 0);
     LOG(("Sent buffer (len=%u), result=%d", length, result));
   } else {
     // Fake EAGAIN if we're already buffering data
     result = -1;
     errno = EAGAIN;
   }
   if (result < 0) {
     if (errno == EAGAIN || errno == EWOULDBLOCK) {
+
       // queue data for resend!  And queue any further data for the stream until it is...
       BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc
       channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
       channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
       LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
-      StartDefer();
       return 0;
     }
     LOG(("error %d sending string", errno));
   }
   return result;
 }
 
 // Handles fragmenting binary messages
@@ -2619,19 +2596,20 @@ DataChannel::AppReady()
   }
   mQueuedMessages.Clear();
   mQueuedMessages.Compact();
   // We never use it again...  We could even allocate the array in the odd
   // cases we need it.
 }
 
 uint32_t
-DataChannel::GetBufferedAmount()
+DataChannel::GetBufferedAmountLocked() const
 {
   size_t buffered = 0;
+
   for (auto& buffer : mBufferedData) {
     buffered += buffer->mLength;
   }
   // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
   // amount from the SCTP stack for a single stream.  It is on their to-do
   // list, and once we import a stack with support for that, we'll need to
   // add it to what we buffer.  Also we'll need to ask for notification of a per-
   // stream buffer-low event and merge that into the handling of buffer-low
--- a/netwerk/sctp/datachannel/DataChannel.h
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -16,17 +16,16 @@
 #include "nsISupports.h"
 #include "nsCOMPtr.h"
 #include "mozilla/WeakPtr.h"
 #include "nsString.h"
 #include "nsThreadUtils.h"
 #include "nsTArray.h"
 #include "nsDeque.h"
 #include "nsIInputStream.h"
-#include "nsITimer.h"
 #include "mozilla/Mutex.h"
 #include "DataChannelProtocol.h"
 #include "DataChannelListener.h"
 #ifdef SCTP_DTLS_SUPPORTED
 #include "mtransport/sigslot.h"
 #include "mtransport/transportflow.h"
 #include "mtransport/transportlayer.h"
 #include "mtransport/transportlayerdtls.h"
@@ -87,26 +86,25 @@ public:
 
   uint16_t mStream;
   uint32_t mPpid;
   size_t   mLength;
   char     *mData;
 };
 
 // One per PeerConnection
-class DataChannelConnection: public nsITimerCallback
+class DataChannelConnection
 #ifdef SCTP_DTLS_SUPPORTED
-                             , public sigslot::has_slots<>
+  : public sigslot::has_slots<>
 #endif
 {
   virtual ~DataChannelConnection();
 
 public:
-  NS_DECL_THREADSAFE_ISUPPORTS
-  NS_DECL_NSITIMERCALLBACK
+  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelConnection)
 
   class DataConnectionListener : public SupportsWeakPtr<DataConnectionListener>
   {
   public:
     MOZ_DECLARE_WEAKREFERENCE_TYPENAME(DataChannelConnection::DataConnectionListener)
     virtual ~DataConnectionListener() {}
 
     // Called when a new DataChannel has been opened by the other side.
@@ -184,16 +182,18 @@ public:
 
   friend class DataChannel;
   Mutex  mLock;
 
   void ReadBlob(already_AddRefed<DataChannelConnection> aThis, uint16_t aStream, nsIInputStream* aBlob);
 
   void GetStreamIds(std::vector<uint16_t>* aStreamList);
 
+  bool SendDeferredMessages();
+
 protected:
   friend class DataChannelOnMessageAvailable;
   // Avoid cycles with PeerConnectionImpl
   // Use from main thread only as WeakPtr is not threadsafe
   WeakPtr<DataConnectionListener> mListener;
 
 private:
   friend class DataChannelConnectRunnable;
@@ -217,18 +217,16 @@ private:
   int32_t SendBinary(DataChannel *channel, const char *data,
                      size_t len, uint32_t ppid_partial, uint32_t ppid_final);
   int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
 
   void DeliverQueuedData(uint16_t stream);
 
   already_AddRefed<DataChannel> OpenFinish(already_AddRefed<DataChannel>&& aChannel);
 
-  void StartDefer();
-  bool SendDeferredMessages();
   void ProcessQueuedOpens();
   void ClearResets();
   void SendOutgoingStreamReset();
   void ResetOutgoingStream(uint16_t stream);
   void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
                                 size_t length,
                                 uint16_t stream);
   void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
@@ -261,37 +259,33 @@ private:
 
   // Data:
   // NOTE: while this array will auto-expand, increases in the number of
   // channels available from the stack must be negotiated!
   bool mAllocateEven;
   AutoTArray<RefPtr<DataChannel>,16> mStreams;
   nsDeque mPending; // Holds addref'ed DataChannel's -- careful!
   // holds data that's come in before a channel is open
-  nsTArray<nsAutoPtr<QueuedDataMessage> > mQueuedData;
+  nsTArray<nsAutoPtr<QueuedDataMessage>> mQueuedData;
 
   // Streams pending reset
   AutoTArray<uint16_t,4> mStreamsResetting;
 
   struct socket *mMasterSocket; // accessed from STS thread
   struct socket *mSocket; // cloned from mMasterSocket on successful Connect on STS thread
   uint16_t mState; // Protected with mLock
 
 #ifdef SCTP_DTLS_SUPPORTED
   RefPtr<TransportFlow> mTransportFlow;
   nsCOMPtr<nsIEventTarget> mSTS;
 #endif
   uint16_t mLocalPort; // Accessed from connect thread
   uint16_t mRemotePort;
   bool mUsingDtls;
 
-  // Timer to control when we try to resend blocked messages
-  nsCOMPtr<nsITimer> mDeferredTimer;
-  uint32_t mDeferTimeout; // in ms
-  bool mTimerRunning;
   nsCOMPtr<nsIThread> mInternalIOThread;
 };
 
 #define ENSURE_DATACONNECTION \
   do { if (!mConnection) { DATACHANNEL_LOG(("%s: %p no connection!",__FUNCTION__, this)); return; } } while (0)
 
 #define ENSURE_DATACONNECTION_RET(x) \
   do { if (!mConnection) { DATACHANNEL_LOG(("%s: %p no connection!",__FUNCTION__, this)); return (x); } } while (0)
@@ -382,17 +376,22 @@ public:
         return false;
     }
 
   uint16_t GetType() { return mPrPolicy; }
 
   bool GetOrdered() { return !(mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED); }
 
   // Amount of data buffered to send
-  uint32_t GetBufferedAmount();
+  uint32_t GetBufferedAmount()
+  {
+    MutexAutoLock lock(mConnection->mLock);
+    return GetBufferedAmountLocked();
+  }
+
 
   // Trigger amount for generating BufferedAmountLow events
   uint32_t GetBufferedAmountLowThreshold();
   void SetBufferedAmountLowThreshold(uint32_t aThreshold);
 
   // Find out state
   uint16_t GetReadyState()
     {
@@ -418,48 +417,48 @@ protected:
   DataChannelListener *mListener;
   nsCOMPtr<nsISupports> mContext;
 
 private:
   friend class DataChannelOnMessageAvailable;
   friend class DataChannelConnection;
 
   nsresult AddDataToBinaryMsg(const char *data, uint32_t size);
+  uint32_t GetBufferedAmountLocked() const;
 
   RefPtr<DataChannelConnection> mConnection;
   nsCString mLabel;
   nsCString mProtocol;
   uint16_t mState;
   bool     mReady;
   uint16_t mStream;
   uint16_t mPrPolicy;
   uint32_t mPrValue;
   uint32_t mFlags;
   uint32_t mId;
   bool mIsRecvBinary;
   size_t mBufferedThreshold;
   nsCString mRecvBuffer;
-  nsTArray<nsAutoPtr<BufferedMsg> > mBufferedData;
-  nsTArray<nsCOMPtr<nsIRunnable> > mQueuedMessages;
+  nsTArray<nsAutoPtr<BufferedMsg>> mBufferedData; // GUARDED_BY(mConnection->mLock)
+  nsTArray<nsCOMPtr<nsIRunnable>> mQueuedMessages;
 };
 
 // used to dispatch notifications of incoming data to the main thread
 // Patterned on CallOnMessageAvailable in WebSockets
 // Also used to proxy other items to MainThread
 class DataChannelOnMessageAvailable : public Runnable
 {
 public:
   enum {
     ON_CONNECTION,
     ON_DISCONNECTED,
     ON_CHANNEL_CREATED,
     ON_CHANNEL_OPEN,
     ON_CHANNEL_CLOSED,
     ON_DATA,
-    START_DEFER,
     BUFFER_LOW_THRESHOLD,
     NO_LONGER_BUFFERED,
   };  /* types */
 
   DataChannelOnMessageAvailable(int32_t     aType,
                                 DataChannelConnection *aConnection,
                                 DataChannel *aChannel,
                                 nsCString   &aData,  // XXX this causes inefficiency
@@ -550,19 +549,16 @@ public:
           case ON_CHANNEL_CREATED:
             // important to give it an already_AddRefed pointer!
             mConnection->mListener->NotifyDataChannel(mChannel.forget());
             break;
           default:
             break;
         }
         break;
-      case START_DEFER:
-        mConnection->StartDefer();
-        break;
     }
     return NS_OK;
   }
 
 private:
   ~DataChannelOnMessageAvailable() {}
 
   int32_t                           mType;