Bug 1059081 - Add a threadsafe wrapper for persistent nsMultiplexStream queues. r=nfroyd, a=ritu
authorJosh Matthews <josh@joshmatthews.net>
Thu, 28 May 2015 13:23:56 -0400
changeset 266125 a8c1768e0d87
parent 266124 befc7b294081
child 266126 01c9c80931d5
push id4762
push userryanvm@gmail.com
push date2015-05-28 18:54 +0000
treeherdermozilla-beta@dc9c305024f4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersnfroyd, ritu
bugs1059081
milestone39.0
Bug 1059081 - Add a threadsafe wrapper for persistent nsMultiplexStream queues. r=nfroyd, a=ritu
xpcom/io/nsMultiplexInputStream.cpp
--- a/xpcom/io/nsMultiplexInputStream.cpp
+++ b/xpcom/io/nsMultiplexInputStream.cpp
@@ -6,16 +6,17 @@
 
 /**
  * The multiplex stream concatenates a list of input streams into a single
  * stream.
  */
 
 #include "mozilla/Attributes.h"
 #include "mozilla/MathAlgorithms.h"
+#include "mozilla/Mutex.h"
 
 #include "base/basictypes.h"
 
 #include "nsMultiplexInputStream.h"
 #include "nsIMultiplexInputStream.h"
 #include "nsISeekableStream.h"
 #include "nsCOMPtr.h"
 #include "nsCOMArray.h"
@@ -54,16 +55,17 @@ private:
     void* mClosure;
     bool mDone;
   };
 
   static NS_METHOD ReadSegCb(nsIInputStream* aIn, void* aClosure,
                              const char* aFromRawSegment, uint32_t aToOffset,
                              uint32_t aCount, uint32_t* aWriteCount);
 
+  Mutex mLock; // Protects access to all data members.
   nsTArray<nsCOMPtr<nsIInputStream>> mStreams;
   uint32_t mCurrentStream;
   bool mStartedReadingCurrent;
   nsresult mStatus;
 };
 
 NS_IMPL_ADDREF(nsMultiplexInputStream)
 NS_IMPL_RELEASE(nsMultiplexInputStream)
@@ -114,26 +116,28 @@ TellMaybeSeek(nsISeekableStream* aSeekab
     if (NS_SUCCEEDED(rv)) {
       rv = aSeekable->Tell(aResult);
     }
   }
   return rv;
 }
 
 nsMultiplexInputStream::nsMultiplexInputStream()
-  : mCurrentStream(0),
+  : mLock("nsMultiplexInputStream lock"),
+    mCurrentStream(0),
     mStartedReadingCurrent(false),
     mStatus(NS_OK)
 {
 }
 
 /* readonly attribute unsigned long count; */
 NS_IMETHODIMP
 nsMultiplexInputStream::GetCount(uint32_t* aCount)
 {
+  MutexAutoLock lock(mLock);
   *aCount = mStreams.Length();
   return NS_OK;
 }
 
 #ifdef DEBUG
 static bool
 SeekableStreamAtBeginning(nsIInputStream* aStream)
 {
@@ -145,66 +149,71 @@ SeekableStreamAtBeginning(nsIInputStream
   return true;
 }
 #endif
 
 /* void appendStream (in nsIInputStream stream); */
 NS_IMETHODIMP
 nsMultiplexInputStream::AppendStream(nsIInputStream* aStream)
 {
+  MutexAutoLock lock(mLock);
   NS_ASSERTION(SeekableStreamAtBeginning(aStream),
                "Appended stream not at beginning.");
   return mStreams.AppendElement(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY;
 }
 
 /* void insertStream (in nsIInputStream stream, in unsigned long index); */
 NS_IMETHODIMP
 nsMultiplexInputStream::InsertStream(nsIInputStream* aStream, uint32_t aIndex)
 {
+  MutexAutoLock lock(mLock);
   NS_ASSERTION(SeekableStreamAtBeginning(aStream),
                "Inserted stream not at beginning.");
   mStreams.InsertElementAt(aIndex, aStream);
   if (mCurrentStream > aIndex ||
       (mCurrentStream == aIndex && mStartedReadingCurrent)) {
     ++mCurrentStream;
   }
   return NS_OK;
 }
 
 /* void removeStream (in unsigned long index); */
 NS_IMETHODIMP
 nsMultiplexInputStream::RemoveStream(uint32_t aIndex)
 {
+  MutexAutoLock lock(mLock);
   mStreams.RemoveElementAt(aIndex);
   if (mCurrentStream > aIndex) {
     --mCurrentStream;
   } else if (mCurrentStream == aIndex) {
     mStartedReadingCurrent = false;
   }
 
   return NS_OK;
 }
 
 /* nsIInputStream getStream (in unsigned long index); */
 NS_IMETHODIMP
 nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult)
 {
+  MutexAutoLock lock(mLock);
   *aResult = mStreams.SafeElementAt(aIndex, nullptr);
   if (NS_WARN_IF(!*aResult)) {
     return NS_ERROR_NOT_AVAILABLE;
   }
 
   NS_ADDREF(*aResult);
   return NS_OK;
 }
 
 /* void close (); */
 NS_IMETHODIMP
 nsMultiplexInputStream::Close()
 {
+  MutexAutoLock lock(mLock);
   mStatus = NS_BASE_STREAM_CLOSED;
 
   nsresult rv = NS_OK;
 
   uint32_t len = mStreams.Length();
   for (uint32_t i = 0; i < len; ++i) {
     nsresult rv2 = mStreams[i]->Close();
     // We still want to close all streams, but we should return an error
@@ -214,16 +223,17 @@ nsMultiplexInputStream::Close()
   }
   return rv;
 }
 
 /* unsigned long long available (); */
 NS_IMETHODIMP
 nsMultiplexInputStream::Available(uint64_t* aResult)
 {
+  MutexAutoLock lock(mLock);
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
 
   nsresult rv;
   uint64_t avail = 0;
 
   uint32_t len = mStreams.Length();
@@ -238,16 +248,17 @@ nsMultiplexInputStream::Available(uint64
   *aResult = avail;
   return NS_OK;
 }
 
 /* [noscript] unsigned long read (in charPtr buf, in unsigned long count); */
 NS_IMETHODIMP
 nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
 {
+  MutexAutoLock lock(mLock);
   // It is tempting to implement this method in terms of ReadSegments, but
   // that would prevent this class from being used with streams that only
   // implement Read (e.g., file streams).
 
   *aResult = 0;
 
   if (mStatus == NS_BASE_STREAM_CLOSED) {
     return NS_OK;
@@ -289,16 +300,18 @@ nsMultiplexInputStream::Read(char* aBuf,
 
 /* [noscript] unsigned long readSegments (in nsWriteSegmentFun writer,
  *                                        in voidPtr closure,
  *                                        in unsigned long count); */
 NS_IMETHODIMP
 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                                      uint32_t aCount, uint32_t* aResult)
 {
+  MutexAutoLock lock(mLock);
+
   if (mStatus == NS_BASE_STREAM_CLOSED) {
     *aResult = 0;
     return NS_OK;
   }
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
 
@@ -366,16 +379,18 @@ nsMultiplexInputStream::ReadSegCb(nsIInp
   }
   return rv;
 }
 
 /* readonly attribute boolean nonBlocking; */
 NS_IMETHODIMP
 nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking)
 {
+  MutexAutoLock lock(mLock);
+
   uint32_t len = mStreams.Length();
   if (len == 0) {
     // Claim to be non-blocking, since we won't block the caller.
     // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
     // so maybe we should claim to be blocking?  It probably doesn't
     // matter in practice.
     *aNonBlocking = true;
     return NS_OK;
@@ -394,16 +409,18 @@ nsMultiplexInputStream::IsNonBlocking(bo
   }
   return NS_OK;
 }
 
 /* void seek (in int32_t whence, in int32_t offset); */
 NS_IMETHODIMP
 nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
 {
+  MutexAutoLock lock(mLock);
+
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
 
   nsresult rv;
 
   uint32_t oldCurrentStream = mCurrentStream;
   bool oldStartedReadingCurrent = mStartedReadingCurrent;
@@ -635,16 +652,18 @@ nsMultiplexInputStream::Seek(int32_t aWh
   // other Seeks not implemented yet
   return NS_ERROR_NOT_IMPLEMENTED;
 }
 
 /* uint32_t tell (); */
 NS_IMETHODIMP
 nsMultiplexInputStream::Tell(int64_t* aResult)
 {
+  MutexAutoLock lock(mLock);
+
   if (NS_FAILED(mStatus)) {
     return mStatus;
   }
 
   nsresult rv;
   int64_t ret64 = 0;
   uint32_t i, last;
   last = mStartedReadingCurrent ? mCurrentStream + 1 : mCurrentStream;
@@ -692,16 +711,18 @@ nsMultiplexInputStreamConstructor(nsISup
 
   return rv;
 }
 
 void
 nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
                                   FileDescriptorArray& aFileDescriptors)
 {
+  MutexAutoLock lock(mLock);
+
   MultiplexInputStreamParams params;
 
   uint32_t streamCount = mStreams.Length();
 
   if (streamCount) {
     InfallibleTArray<InputStreamParams>& streams = params.streams();
 
     streams.SetCapacity(streamCount);