Bug 1415081 - part 0 - NS_ReadInputStreamToString must support nsIAsyncInputStream, r=smaug
authorAndrea Marchesini <amarchesini@mozilla.com>
Thu, 09 Nov 2017 11:18:08 +0100
changeset 444202 057c4132d24848a144135516b2a5f11ee7cc29c3
parent 444200 176a739c66b9ebd4098131339c0ed12a192c7dc9
child 444203 7cf6867bed423f52c684994c77cd1db376de3bf1
push id1618
push userCallek@gmail.com
push dateThu, 11 Jan 2018 17:45:48 +0000
treeherdermozilla-release@882ca853e05a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssmaug
bugs1415081
milestone58.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1415081 - part 0 - NS_ReadInputStreamToString must support nsIAsyncInputStream, r=smaug
netwerk/base/nsNetUtil.cpp
netwerk/base/nsNetUtil.h
--- a/netwerk/base/nsNetUtil.cpp
+++ b/netwerk/base/nsNetUtil.cpp
@@ -8,16 +8,18 @@
 #include "HttpLog.h"
 
 #include "nsNetUtil.h"
 
 #include "mozilla/Encoding.h"
 #include "mozilla/LoadContext.h"
 #include "mozilla/LoadInfo.h"
 #include "mozilla/BasePrincipal.h"
+#include "mozilla/Monitor.h"
+#include "mozilla/TaskQueue.h"
 #include "mozilla/Telemetry.h"
 #include "nsCategoryCache.h"
 #include "nsContentUtils.h"
 #include "nsHashKeys.h"
 #include "nsHttp.h"
 #include "nsIAsyncStreamCopier.h"
 #include "nsIAuthPrompt.h"
 #include "nsIAuthPrompt2.h"
@@ -1406,55 +1408,369 @@ NS_NewPostDataStream(nsIInputStream  **r
     rv = stream->SetData(data.BeginReading(), data.Length());
     if (NS_FAILED(rv))
         return rv;
 
     stream.forget(result);
     return NS_OK;
 }
 
+namespace {
+
+#define BUFFER_SIZE 4096
+
+class BufferWriter final : public Runnable
+                         , public nsIInputStreamCallback
+{
+public:
+    NS_DECL_ISUPPORTS_INHERITED
+
+    BufferWriter(nsIInputStream* aInputStream,
+                 void* aBuffer, int64_t aCount)
+        : Runnable("BufferWriterRunnable")
+        , mMonitor("BufferWriter.mMonitor")
+        , mInputStream(aInputStream)
+        , mBuffer(aBuffer)
+        , mCount(aCount)
+        , mWrittenData(0)
+        , mBufferType(mBuffer ? eExternal : eInternal)
+        , mAsyncResult(NS_OK)
+        , mBufferSize(0)
+    {
+        MOZ_ASSERT(aInputStream);
+        MOZ_ASSERT(aCount == -1 || aCount > 0);
+        MOZ_ASSERT_IF(mBuffer, aCount > 0);
+    }
+
+    nsresult
+    Write()
+    {
+        // Let's make the inputStream buffered if it's not.
+        if (!NS_InputStreamIsBuffered(mInputStream)) {
+            nsCOMPtr<nsIInputStream> bufferedStream;
+            nsresult rv =
+                NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
+                                          mInputStream.forget(), BUFFER_SIZE);
+            NS_ENSURE_SUCCESS(rv, rv);
+
+            mInputStream = bufferedStream;
+        }
+
+        mAsyncInputStream = do_QueryInterface(mInputStream);
+
+        if (!mAsyncInputStream) {
+            return WriteSync();
+        }
+
+        // Let's use mAsyncInputStream only.
+        mInputStream = nullptr;
+
+        return WriteAsync();
+    }
+
+    uint64_t
+    WrittenData() const
+    {
+        return mWrittenData;
+    }
+
+    void*
+    StealBuffer()
+    {
+        MOZ_ASSERT(mBufferType == eInternal);
+
+        void* buffer = mBuffer;
+        mBuffer = nullptr;
+
+        return buffer;
+    }
+
+private:
+    ~BufferWriter()
+    {
+        if (mBuffer && mBufferType == eInternal) {
+            free(mBuffer);
+        }
+
+        if (mTaskQueue) {
+            mTaskQueue->BeginShutdown();
+        }
+    }
+
+    nsresult
+    WriteSync()
+    {
+        uint64_t length = (uint64_t)mCount;
+
+        if (mCount == -1) {
+            nsresult rv = mInputStream->Available(&length);
+            NS_ENSURE_SUCCESS(rv, rv);
+
+            if (length == 0) {
+                // nothing to read.
+                return NS_OK;
+            }
+        }
+
+        if (mBufferType == eInternal) {
+            mBuffer = malloc(length);
+            if (NS_WARN_IF(!mBuffer)) {
+                return NS_ERROR_OUT_OF_MEMORY;
+            }
+        }
+
+        uint32_t writtenData;
+        nsresult rv = mInputStream->ReadSegments(NS_CopySegmentToBuffer,
+                                                 mBuffer, length,
+                                                 &writtenData);
+        NS_ENSURE_SUCCESS(rv, rv);
+
+        mWrittenData = writtenData;
+        return NS_OK;
+    }
+
+    nsresult
+    WriteAsync()
+    {
+        if (mCount > 0 && mBufferType == eInternal) {
+            mBuffer = malloc(mCount);
+            if (NS_WARN_IF(!mBuffer)) {
+                return NS_ERROR_OUT_OF_MEMORY;
+            }
+        }
+
+        nsCOMPtr<nsIEventTarget> target =
+            do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
+        if (!target) {
+            return NS_ERROR_FAILURE;
+        }
+
+        mTaskQueue = new TaskQueue(target.forget());
+
+        MonitorAutoLock lock(mMonitor);
+
+        nsCOMPtr<nsIRunnable> runnable = this;
+        nsresult rv = mTaskQueue->Dispatch(runnable.forget(),
+                                           AbstractThread::DontAssertDispatchSuccess);
+        NS_ENSURE_SUCCESS(rv, rv);
+
+        lock.Wait();
+
+        return mAsyncResult;
+    }
+
+    // This method runs on the I/O Thread when the owning thread is blocked by
+    // the mMonitor. It is called multiple times until mCount is greater than 0
+    // or until there is something to read in the stream.
+    NS_IMETHOD
+    Run() override
+    {
+        MOZ_ASSERT(mAsyncInputStream);
+        MOZ_ASSERT(!mInputStream);
+
+        if (mCount == 0) {
+            OperationCompleted(NS_OK);
+            return NS_OK;
+        }
+
+        if (mCount == -1 && !MaybeExpandBufferSize()) {
+            OperationCompleted(NS_ERROR_OUT_OF_MEMORY);
+            return NS_OK;
+        }
+
+        uint64_t offset = mWrittenData;
+        uint64_t length = mCount == -1 ? BUFFER_SIZE : mCount;
+
+        // Let's try to read it directly.
+        uint32_t writtenData;
+        nsresult rv = mAsyncInputStream->ReadSegments(NS_CopySegmentToBuffer,
+                                                     static_cast<char*>(mBuffer) + offset,
+                                                     length, &writtenData);
+
+        // Operation completed.
+        if (NS_SUCCEEDED(rv) && writtenData == 0) {
+            OperationCompleted(NS_OK);
+            return NS_OK;
+        }
+
+        // If we succeeded, let's try to read again.
+        if (NS_SUCCEEDED(rv)) {
+            mWrittenData += writtenData;
+            if (mCount != -1) {
+                MOZ_ASSERT(mCount >= writtenData);
+                mCount -= writtenData;
+            }
+
+            nsCOMPtr<nsIRunnable> runnable = this;
+            rv = mTaskQueue->Dispatch(runnable.forget(),
+                                      AbstractThread::DontAssertDispatchSuccess);
+            if (NS_WARN_IF(NS_FAILED(rv))) {
+                OperationCompleted(rv);
+            }
+        
+            return NS_OK;
+        }
+
+        // Async wait...
+        if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
+            rv = mAsyncInputStream->AsyncWait(this, 0, length, mTaskQueue);
+            if (NS_WARN_IF(NS_FAILED(rv))) {
+                OperationCompleted(rv);
+            }
+            return NS_OK;
+        }
+
+        // Error.
+        OperationCompleted(rv);
+        return NS_OK;
+    }
+
+    NS_IMETHOD
+    OnInputStreamReady(nsIAsyncInputStream* aStream) override
+    {
+        MOZ_ASSERT(aStream == mAsyncInputStream);
+        // The stream is ready, let's read it again.
+        return Run();
+    }
+
+    // This function is called from the I/O thread and it will unblock the
+    // owning thread.
+    void
+    OperationCompleted(nsresult aRv)
+    {
+        MonitorAutoLock lock(mMonitor);
+
+        mAsyncResult = aRv;
+
+        // This will unlock the owning thread.
+        lock.Notify();
+    }
+
+    bool
+    MaybeExpandBufferSize()
+    {
+        MOZ_ASSERT(mCount == -1);
+
+        if (mBufferSize >= mWrittenData + BUFFER_SIZE) {
+            // The buffer is big enough.
+            return true;
+        }
+
+        CheckedUint32 bufferSize =
+            std::max<uint32_t>(static_cast<uint32_t>(mWrittenData),
+                               BUFFER_SIZE);
+        while (bufferSize.isValid() &&
+               bufferSize.value() < mWrittenData + BUFFER_SIZE) {
+            bufferSize *= 2;
+        }
+
+        if (!bufferSize.isValid()) {
+            return false;
+        }
+
+        void* buffer = realloc(mBuffer, bufferSize.value());
+        if (!buffer) {
+            return false;
+        }
+
+        mBuffer = buffer;
+        mBufferSize = bufferSize.value();
+        return true;
+    }
+
+    Monitor mMonitor;
+
+    nsCOMPtr<nsIInputStream> mInputStream;
+    nsCOMPtr<nsIAsyncInputStream> mAsyncInputStream;
+
+    RefPtr<TaskQueue> mTaskQueue;
+
+    void* mBuffer;
+    int64_t mCount;
+    uint64_t mWrittenData;
+
+    enum {
+        // The buffer is allocated internally and this object must release it
+        // in the DTOR if not stolen. The buffer can be reallocated.
+        eInternal,
+
+        // The buffer is not owned by this object and it cannot be reallocated.
+        eExternal,
+    } mBufferType;
+
+    // The following set if needed for the async read.
+    nsresult mAsyncResult;
+    uint64_t mBufferSize;
+};
+
+NS_IMPL_ISUPPORTS_INHERITED(BufferWriter, Runnable, nsIInputStreamCallback)
+
+} // anonymous namespace
+
 nsresult
-NS_ReadInputStreamToBuffer(nsIInputStream *aInputStream,
-                           void **aDest,
-                           uint32_t aCount)
+NS_ReadInputStreamToBuffer(nsIInputStream* aInputStream,
+                           void** aDest,
+                           int64_t aCount,
+                           uint64_t* aWritten)
 {
-    nsresult rv;
+    MOZ_ASSERT(aInputStream);
+    MOZ_ASSERT(aCount >= -1);
+
+    uint64_t dummyWritten;
+    if (!aWritten) {
+        aWritten = &dummyWritten;
+    }
+
+    if (aCount == 0) {
+        *aWritten = 0;
+        return NS_OK;
+    }
+
+    // This will take care of allocating and reallocating aDest.
+    RefPtr<BufferWriter> writer =
+       new BufferWriter(aInputStream, *aDest, aCount);
+
+    nsresult rv = writer->Write();
+    NS_ENSURE_SUCCESS(rv, rv);
+
+    *aWritten = writer->WrittenData();
 
     if (!*aDest) {
-        *aDest = malloc(aCount);
-        if (!*aDest)
-            return NS_ERROR_OUT_OF_MEMORY;
+        *aDest = writer->StealBuffer();
     }
 
-    char * p = reinterpret_cast<char*>(*aDest);
-    uint32_t bytesRead;
-    uint32_t totalRead = 0;
-    while (1) {
-        rv = aInputStream->Read(p + totalRead, aCount - totalRead, &bytesRead);
-        if (!NS_SUCCEEDED(rv))
-            return rv;
-        totalRead += bytesRead;
-        if (totalRead == aCount)
-            break;
-        // if Read reads 0 bytes, we've hit EOF
-        if (bytesRead == 0)
-            return NS_ERROR_UNEXPECTED;
-    }
-    return rv;
+    return NS_OK;
 }
 
 nsresult
-NS_ReadInputStreamToString(nsIInputStream *aInputStream,
-                           nsACString &aDest,
-                           uint32_t aCount)
+NS_ReadInputStreamToString(nsIInputStream* aInputStream,
+                           nsACString& aDest,
+                           int64_t aCount,
+                           uint64_t* aWritten)
 {
-    if (!aDest.SetLength(aCount, mozilla::fallible))
-        return NS_ERROR_OUT_OF_MEMORY;
-    void* dest = aDest.BeginWriting();
-    return NS_ReadInputStreamToBuffer(aInputStream, &dest, aCount);
+    uint64_t dummyWritten;
+    if (!aWritten) {
+        aWritten = &dummyWritten;
+    }
+
+    void* dest = nullptr;
+    nsresult rv = NS_ReadInputStreamToBuffer(aInputStream, &dest, aCount,
+                                             aWritten);
+    MOZ_ASSERT_IF(NS_FAILED(rv), dest == nullptr);
+
+    NS_ENSURE_SUCCESS(rv, rv);
+
+    if (!dest) {
+      MOZ_ASSERT(*aWritten == 0);
+      aDest.Truncate();
+      return NS_OK;
+    }
+
+    aDest.Adopt(reinterpret_cast<char*>(dest), *aWritten);
+    return NS_OK;
 }
 
 nsresult
 NS_NewURI(nsIURI **result,
           const nsACString &spec,
           const char *charset /* = nullptr */,
           nsIURI *baseURI /* = nullptr */,
           nsIIOService *ioService /* = nullptr */)     // pass in nsIIOService to optimize callers
--- a/netwerk/base/nsNetUtil.h
+++ b/netwerk/base/nsNetUtil.h
@@ -521,23 +521,43 @@ nsresult NS_NewBufferedOutputStream(nsIO
                                     already_AddRefed<nsIOutputStream> aOutputStream,
                                     uint32_t aBufferSize);
 
 // returns an input stream compatible with nsIUploadChannel::SetUploadStream()
 nsresult NS_NewPostDataStream(nsIInputStream  **result,
                               bool              isFile,
                               const nsACString &data);
 
+/**
+ * This function reads an inputStream and stores its content into a buffer. In
+ * general, you should avoid using this function because, it blocks the current
+ * thread until the operation is done.
+ * If the inputStream is async, the reading happens on an I/O thread.
+ *
+ * @param aInputStream the inputStream.
+ * @param aDest the destination buffer. if *aDest is null, it will be allocated
+ *              with the size of the written data. if aDest is not null, aCount
+ *              must greater than 0.
+ * @param aCount the amount of data to read. Use -1 if you want that all the
+ *               stream is read.
+ * @param aWritten this pointer will be used to store the number of data
+ *                 written in the buffer. If you don't need, pass nullptr.
+ */
 nsresult NS_ReadInputStreamToBuffer(nsIInputStream *aInputStream,
                                     void **aDest,
-                                    uint32_t aCount);
+                                    int64_t aCount,
+                                    uint64_t* aWritten = nullptr);
 
+/**
+ * See the comment for NS_ReadInputStreamToBuffer
+ */
 nsresult NS_ReadInputStreamToString(nsIInputStream *aInputStream,
                                     nsACString &aDest,
-                                    uint32_t aCount);
+                                    int64_t aCount,
+                                    uint64_t* aWritten = nullptr);
 
 nsresult
 NS_LoadPersistentPropertiesFromURISpec(nsIPersistentProperties **outResult,
                                        const nsACString         &aSpec);
 
 /**
  * NS_QueryNotificationCallbacks implements the canonical algorithm for
  * querying interfaces from a channel's notification callbacks.  It first