Bug 1663718 - Don't put too much data in buffer when the data can't be written to socket r=dragana
authorKershaw Chang <kershaw@mozilla.com>
Fri, 11 Sep 2020 08:44:07 +0000
changeset 548298 a2372b6285d7871f7eb6e914583b1113da2038a1
parent 548297 433c4ac7d65a7ee98ae406cbc0421185a0c23021
child 548299 92159cd14d67e0dfbd5bca16b3c1a449ec9b0243
push id37776
push userbtara@mozilla.com
push dateFri, 11 Sep 2020 15:10:42 +0000
treeherdermozilla-central@b133e2d673e8 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdragana
bugs1663718
milestone82.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1663718 - Don't put too much data in buffer when the data can't be written to socket r=dragana The main reason we used too much memory is that we ignore the `NS_BASE_STREAM_WOULD_BLOCK` returned from socket output stream. When the output stream is blocked, all the data is stored in the output queue and make the memory usage high. Differential Revision: https://phabricator.services.mozilla.com/D89563
netwerk/protocol/websocket/WebSocketChannel.cpp
netwerk/protocol/websocket/WebSocketConnectionChild.cpp
netwerk/protocol/websocket/nsIWebSocketConnection.idl
netwerk/protocol/websocket/nsWebSocketConnection.cpp
netwerk/protocol/websocket/nsWebSocketConnection.h
--- a/netwerk/protocol/websocket/WebSocketChannel.cpp
+++ b/netwerk/protocol/websocket/WebSocketChannel.cpp
@@ -3857,16 +3857,20 @@ void WebSocketChannel::DoEnqueueOutgoing
         ("WebSocketChannel::DoEnqueueOutgoingMessage: "
          "Try to send %u of hdr/copybreak and %u of data\n",
          mHdrOutSize, mCurrentOut->Length()));
 
     nsresult rv = mConnection->EnqueueOutputData(
         mHdrOut, mHdrOutSize, (uint8_t*)mCurrentOut->BeginReading(),
         mCurrentOut->Length());
 
+    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+      return;
+    }
+
     LOG(("WebSocketChannel::DoEnqueueOutgoingMessage: rv %" PRIx32 "\n",
          static_cast<uint32_t>(rv)));
 
     if (NS_FAILED(rv)) {
       AbortSession(rv);
       return;
     }
 
@@ -3919,12 +3923,18 @@ WebSocketChannel::OnTCPClosed() {
   return NS_OK;
 }
 
 NS_IMETHODIMP
 WebSocketChannel::OnDataReceived(uint8_t* aData, uint32_t aCount) {
   return ProcessInput(aData, aCount);
 }
 
+NS_IMETHODIMP
+WebSocketChannel::OnReadyToSendData() {
+  DoEnqueueOutgoingMessage();
+  return NS_OK;
+}
+
 }  // namespace net
 }  // namespace mozilla
 
 #undef CLOSE_GOING_AWAY
--- a/netwerk/protocol/websocket/WebSocketConnectionChild.cpp
+++ b/netwerk/protocol/websocket/WebSocketConnectionChild.cpp
@@ -169,16 +169,22 @@ WebSocketConnectionChild::OnDataReceived
   if (CanSend()) {
     nsTArray<uint8_t> data;
     data.AppendElements(aData, aCount);
     Unused << SendOnDataReceived(std::move(data));
   }
   return NS_OK;
 }
 
+NS_IMETHODIMP
+WebSocketConnectionChild::OnReadyToSendData() {
+  // TODO: implement flow control between parent and socket process.
+  return NS_OK;
+}
+
 void WebSocketConnectionChild::ActorDestroy(ActorDestroyReason aWhy) {
   LOG(("WebSocketConnectionChild::ActorDestroy %p\n", this));
   if (mConnection) {
     mConnection->Close();
     mConnection = nullptr;
   }
 }
 
--- a/netwerk/protocol/websocket/nsIWebSocketConnection.idl
+++ b/netwerk/protocol/websocket/nsIWebSocketConnection.idl
@@ -66,9 +66,14 @@ interface nsIWebSocketConnection : nsISu
 interface nsIWebSocketConnectionListener : nsISupports
 {
     void onError(in nsresult aStatus);
 
     void onTCPClosed();
 
     void onDataReceived([array, size_is(dataLength)]in uint8_t data,
                         in unsigned long dataLength);
+
+    /**
+     * Called to inform the listener that the outgoing data is ready to write.
+     */
+    void onReadyToSendData();
 };
--- a/netwerk/protocol/websocket/nsWebSocketConnection.cpp
+++ b/netwerk/protocol/websocket/nsWebSocketConnection.cpp
@@ -15,17 +15,18 @@ NS_IMPL_ISUPPORTS(nsWebSocketConnection,
 
 nsWebSocketConnection::nsWebSocketConnection(
     nsISocketTransport* aTransport, nsIAsyncInputStream* aInputStream,
     nsIAsyncOutputStream* aOutputStream)
     : mTransport(aTransport),
       mSocketIn(aInputStream),
       mSocketOut(aOutputStream),
       mWriteOffset(0),
-      mStartReadingCalled(false) {
+      mStartReadingCalled(false),
+      mOutputStreamBlocked(false) {
   LOG(("nsWebSocketConnection ctor %p\n", this));
 }
 
 nsWebSocketConnection::~nsWebSocketConnection() {
   LOG(("nsWebSocketConnection dtor %p\n", this));
 }
 
 NS_IMETHODIMP
@@ -100,16 +101,20 @@ nsWebSocketConnection::EnqueueOutputData
   data.AppendElements(aHdrBuf, aHdrBufLength);
   data.AppendElements(aPayloadBuf, aPayloadBufLength);
   mOutputQueue.emplace_back(std::move(data));
 
   if (mSocketOut) {
     mSocketOut->AsyncWait(this, 0, 0, mEventTarget);
   }
 
+  if (mOutputStreamBlocked) {
+    return NS_BASE_STREAM_WOULD_BLOCK;
+  }
+
   return NS_OK;
 }
 
 NS_IMETHODIMP
 nsWebSocketConnection::StartReading() {
   if (!mSocketIn) {
     return NS_ERROR_NOT_AVAILABLE;
   }
@@ -210,31 +215,34 @@ nsWebSocketConnection::OnOutputStreamRea
   LOG(("nsWebSocketConnection::OnOutputStreamReady() %p\n", this));
   MOZ_ASSERT(mEventTarget->IsOnCurrentThread());
   MOZ_ASSERT(mListener);
 
   if (!mSocketOut) {
     return NS_OK;
   }
 
+  mOutputStreamBlocked = false;
+
   while (!mOutputQueue.empty()) {
     const OutputData& data = mOutputQueue.front();
 
     char* buffer = reinterpret_cast<char*>(
                        const_cast<uint8_t*>(data.GetData().Elements())) +
                    mWriteOffset;
     uint32_t toWrite = data.GetData().Length() - mWriteOffset;
 
     uint32_t wrote = 0;
     nsresult rv = mSocketOut->Write(buffer, toWrite, &wrote);
     LOG(("nsWebSocketConnection::OnOutputStreamReady: write %u rv %" PRIx32,
          wrote, static_cast<uint32_t>(rv)));
 
     if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
       mSocketOut->AsyncWait(this, 0, 0, mEventTarget);
+      mOutputStreamBlocked = true;
       return NS_OK;
     }
 
     if (NS_FAILED(rv)) {
       LOG(("nsWebSocketConnection::OnOutputStreamReady %p failed %u\n", this,
            static_cast<uint32_t>(rv)));
       mListener->OnError(rv);
       return NS_OK;
@@ -243,10 +251,12 @@ nsWebSocketConnection::OnOutputStreamRea
     mWriteOffset += wrote;
 
     if (toWrite == wrote) {
       mWriteOffset = 0;
       mOutputQueue.pop_front();
     }
   }
 
+  Unused << mListener->OnReadyToSendData();
+
   return NS_OK;
 }
--- a/netwerk/protocol/websocket/nsWebSocketConnection.h
+++ b/netwerk/protocol/websocket/nsWebSocketConnection.h
@@ -55,14 +55,15 @@ class nsWebSocketConnection : public nsI
   nsCOMPtr<nsIWebSocketConnectionListener> mListener;
   nsCOMPtr<nsISocketTransport> mTransport;
   nsCOMPtr<nsIAsyncInputStream> mSocketIn;
   nsCOMPtr<nsIAsyncOutputStream> mSocketOut;
   nsCOMPtr<nsIEventTarget> mEventTarget;
   size_t mWriteOffset;
   std::list<OutputData> mOutputQueue;
   bool mStartReadingCalled;
+  bool mOutputStreamBlocked;
 };
 
 }  // namespace net
 }  // namespace mozilla
 
 #endif  // mozilla_net_nsWebSocketConnection_h