dom/cache/ReadStream.cpp
author Bogdan Tara <btara@mozilla.com>
Fri, 28 Sep 2018 02:42:20 +0300
changeset 438612 6cc26ea43938b62443c992908f0fbdd9d4333c29
parent 420862 b54db66223586b4e04f5cb926fccdacf8a176b91
child 448947 6f3709b3878117466168c40affa7bca0b60cf75b
permissions -rw-r--r--
Backed out changeset ba1fef7b14eb (bug 1493955) for GTest failures CLOSED TREE

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "mozilla/dom/cache/ReadStream.h"

#include "mozilla/Unused.h"
#include "mozilla/dom/cache/CacheStreamControlChild.h"
#include "mozilla/dom/cache/CacheStreamControlParent.h"
#include "mozilla/dom/cache/CacheTypes.h"
#include "mozilla/ipc/IPCStreamUtils.h"
#include "mozilla/SnappyUncompressInputStream.h"
#include "nsIAsyncInputStream.h"
#include "nsStringStream.h"
#include "nsTArray.h"

namespace mozilla {
namespace dom {
namespace cache {

using mozilla::Unused;
using mozilla::ipc::AutoIPCStream;
using mozilla::ipc::IPCStream;
using mozilla::ipc::OptionalIPCStream;

// ----------------------------------------------------------------------------

// The inner stream class.  This is where all of the real work is done.  As
// an invariant Inner::Close() must be called before ~Inner().  This is
// guaranteed by our outer ReadStream class.
class ReadStream::Inner final : public ReadStream::Controllable
{
public:
  Inner(StreamControl* aControl, const nsID& aId,
        nsIInputStream* aStream);

  void
  Serialize(CacheReadStreamOrVoid* aReadStreamOut,
            nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
            ErrorResult& aRv);

  void
  Serialize(CacheReadStream* aReadStreamOut,
            nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
            ErrorResult& aRv);

  // ReadStream::Controllable methods
  virtual void
  CloseStream() override;

  virtual void
  CloseStreamWithoutReporting() override;

  virtual bool
  MatchId(const nsID& aId) const override;

  virtual bool
  HasEverBeenRead() const override;

  // Simulate nsIInputStream methods, but we don't actually inherit from it
  nsresult
  Close();

  nsresult
  Available(uint64_t *aNumAvailableOut);

  nsresult
  Read(char *aBuf, uint32_t aCount, uint32_t *aNumReadOut);

  nsresult
  ReadSegments(nsWriteSegmentFun aWriter, void *aClosure, uint32_t aCount,
               uint32_t *aNumReadOut);

  nsresult
  IsNonBlocking(bool *aNonBlockingOut);

private:
  class NoteClosedRunnable;
  class ForgetRunnable;

  ~Inner();

  void
  NoteClosed();

  void
  Forget();

  void
  NoteClosedOnOwningThread();

  void
  ForgetOnOwningThread();

  nsIInputStream*
  EnsureStream();

  void
  AsyncOpenStreamOnOwningThread();

  void
  MaybeAbortAsyncOpenStream();

  void
  OpenStreamFailed();

  // Weak ref to the stream control actor.  The actor will always call either
  // CloseStream() or CloseStreamWithoutReporting() before it's destroyed.  The
  // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
  // ForgetOnOwningThread() method call.
  StreamControl* mControl;

  const nsID mId;
  nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;

  enum State
  {
    Open,
    Closed,
    NumStates
  };
  Atomic<State> mState;
  Atomic<bool> mHasEverBeenRead;
  bool mAsyncOpenStarted;

  // The wrapped stream objects may not be threadsafe.  We need to be able
  // to close a stream on our owning thread while an IO thread is simultaneously
  // reading the same stream.  Therefore, protect all access to these stream
  // objects with a mutex.
  Mutex mMutex;
  CondVar mCondVar;
  nsCOMPtr<nsIInputStream> mStream;
  nsCOMPtr<nsIInputStream> mSnappyStream;

  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override)
};

// ----------------------------------------------------------------------------

// Runnable to notify actors that the ReadStream has closed.  This must
// be done on the thread associated with the PBackground actor.  Must be
// cancelable to execute on Worker threads (which can occur when the
// ReadStream is constructed on a child process Worker thread).
class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable
{
public:
  explicit NoteClosedRunnable(ReadStream::Inner* aStream)
    : CancelableRunnable("dom::cache::ReadStream::Inner::NoteClosedRunnable")
    , mStream(aStream)
  { }

  NS_IMETHOD Run() override
  {
    mStream->NoteClosedOnOwningThread();
    mStream = nullptr;
    return NS_OK;
  }

  // Note, we must proceed with the Run() method since our actor will not
  // clean itself up until we note that the stream is closed.
  nsresult Cancel() override
  {
    Run();
    return NS_OK;
  }

private:
  ~NoteClosedRunnable() { }

  RefPtr<ReadStream::Inner> mStream;
};

// ----------------------------------------------------------------------------

// Runnable to clear actors without reporting that the ReadStream has
// closed.  Since this can trigger actor destruction, we need to do
// it on the thread associated with the PBackground actor.  Must be
// cancelable to execute on Worker threads (which can occur when the
// ReadStream is constructed on a child process Worker thread).
class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable
{
public:
  explicit ForgetRunnable(ReadStream::Inner* aStream)
    : CancelableRunnable("dom::cache::ReadStream::Inner::ForgetRunnable")
    , mStream(aStream)
  { }

  NS_IMETHOD Run() override
  {
    mStream->ForgetOnOwningThread();
    mStream = nullptr;
    return NS_OK;
  }

  // Note, we must proceed with the Run() method so that we properly
  // call RemoveListener on the actor.
  nsresult Cancel() override
  {
    Run();
    return NS_OK;
  }

private:
  ~ForgetRunnable() { }

  RefPtr<ReadStream::Inner> mStream;
};

// ----------------------------------------------------------------------------

ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
                         nsIInputStream* aStream)
  : mControl(aControl)
  , mId(aId)
  , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
  , mState(Open)
  , mHasEverBeenRead(false)
  , mAsyncOpenStarted(false)
  , mMutex("dom::cache::ReadStream")
  , mCondVar(mMutex, "dom::cache::ReadStream")
  , mStream(aStream)
  , mSnappyStream(aStream ? new SnappyUncompressInputStream(aStream) : nullptr)
{
  MOZ_DIAGNOSTIC_ASSERT(mControl);
  mControl->AddReadStream(this);
}

void
ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
                             nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                             ErrorResult& aRv)
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
  MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
  *aReadStreamOut = CacheReadStream();
  Serialize(&aReadStreamOut->get_CacheReadStream(), aStreamCleanupList, aRv);
}

void
ReadStream::Inner::Serialize(CacheReadStream* aReadStreamOut,
                             nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                             ErrorResult& aRv)
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
  MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);

  if (mState != Open) {
    aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>();
    return;
  }

  MOZ_DIAGNOSTIC_ASSERT(mControl);

  aReadStreamOut->id() = mId;
  mControl->SerializeControl(aReadStreamOut);

  {
    MutexAutoLock lock(mMutex);
    mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
  }

  MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() == OptionalIPCStream::Tvoid_t ||
                        aReadStreamOut->stream().get_IPCStream().type() ==
                        IPCStream::TInputStreamParamsWithFds);

  // We're passing ownership across the IPC barrier with the control, so
  // do not signal that the stream is closed here.
  Forget();
}

void
ReadStream::Inner::CloseStream()
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
  Close();
}

void
ReadStream::Inner::CloseStreamWithoutReporting()
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
  Forget();
}

bool
ReadStream::Inner::MatchId(const nsID& aId) const
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
  return mId.Equals(aId);
}

bool
ReadStream::Inner::HasEverBeenRead() const
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
  return mHasEverBeenRead;
}

nsresult
ReadStream::Inner::Close()
{
  // stream ops can happen on any thread
  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    if (mSnappyStream) {
      rv = mSnappyStream->Close();
    }
  }
  NoteClosed();
  return rv;
}

nsresult
ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
{
  // stream ops can happen on any thread
  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    rv = EnsureStream()->Available(aNumAvailableOut);
  }

  if (NS_FAILED(rv)) {
    Close();
  }

  return rv;
}

nsresult
ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
{
  // stream ops can happen on any thread
  MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);

  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    rv = EnsureStream()->Read(aBuf, aCount, aNumReadOut);
  }

  if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
      *aNumReadOut == 0) {
    Close();
  }

  mHasEverBeenRead = true;

  return rv;
}

nsresult
ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                                uint32_t aCount, uint32_t* aNumReadOut)
{
  // stream ops can happen on any thread
  MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);

  if (aCount) {
    mHasEverBeenRead = true;
  }


  nsresult rv = NS_OK;
  {
    MutexAutoLock lock(mMutex);
    rv = EnsureStream()->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
  }

  if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
                        rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) {
    Close();
  }

  // Verify bytes were actually read before marking as being ever read.  For
  // example, code can test if the stream supports ReadSegments() by calling
  // this method with a dummy callback which doesn't read anything.  We don't
  // want to trigger on that.
  if (*aNumReadOut) {
    mHasEverBeenRead = true;
  }

  return rv;
}

nsresult
ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
{
  // stream ops can happen on any thread
  MutexAutoLock lock(mMutex);
  if (mSnappyStream) {
    return mSnappyStream->IsNonBlocking(aNonBlockingOut);
  }
  *aNonBlockingOut = false;
  return NS_OK;
}

ReadStream::Inner::~Inner()
{
  // Any thread
  MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
  MOZ_DIAGNOSTIC_ASSERT(!mControl);
}

void
ReadStream::Inner::NoteClosed()
{
  // Any thread
  if (mState == Closed) {
    return;
  }

  if (mOwningEventTarget->IsOnCurrentThread()) {
    NoteClosedOnOwningThread();
    return;
  }

  nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(this);
  MOZ_ALWAYS_SUCCEEDS(
    mOwningEventTarget->Dispatch(runnable.forget(), nsIThread::DISPATCH_NORMAL));
}

void
ReadStream::Inner::Forget()
{
  // Any thread
  if (mState == Closed) {
    return;
  }

  if (mOwningEventTarget->IsOnCurrentThread()) {
    ForgetOnOwningThread();
    return;
  }

  nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(this);
  MOZ_ALWAYS_SUCCEEDS(
    mOwningEventTarget->Dispatch(runnable.forget(), nsIThread::DISPATCH_NORMAL));
}

void
ReadStream::Inner::NoteClosedOnOwningThread()
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());

  // Mark closed and do nothing if we were already closed
  if (!mState.compareExchange(Open, Closed)) {
    return;
  }

  MaybeAbortAsyncOpenStream();

  MOZ_DIAGNOSTIC_ASSERT(mControl);
  mControl->NoteClosed(this, mId);
  mControl = nullptr;
}

void
ReadStream::Inner::ForgetOnOwningThread()
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());

  // Mark closed and do nothing if we were already closed
  if (!mState.compareExchange(Open, Closed)) {
    return;
  }

  MaybeAbortAsyncOpenStream();

  MOZ_DIAGNOSTIC_ASSERT(mControl);
  mControl->ForgetReadStream(this);
  mControl = nullptr;
}

nsIInputStream*
ReadStream::Inner::EnsureStream()
{
  mMutex.AssertCurrentThreadOwns();

  // We need to block the current thread while we open the stream.  We
  // cannot do this safely from the main owning thread since it would
  // trigger deadlock.  This should be ok, though, since a blocking
  // stream like this should never be read on the owning thread anyway.
  if (mOwningEventTarget->IsOnCurrentThread()) {
    MOZ_CRASH("Blocking read on the js/ipc owning thread!");
  }

  if (mSnappyStream) {
    return mSnappyStream;
  }

  nsCOMPtr<nsIRunnable> r =
    NewCancelableRunnableMethod("ReadStream::Inner::AsyncOpenStreamOnOwningThread",
                                this,
                                &ReadStream::Inner::AsyncOpenStreamOnOwningThread);
  nsresult rv = mOwningEventTarget->Dispatch(r.forget(),
                                             nsIThread::DISPATCH_NORMAL);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    OpenStreamFailed();
    return mSnappyStream;
  }

  mCondVar.Wait();
  MOZ_DIAGNOSTIC_ASSERT(mSnappyStream);

  return mSnappyStream;
}

void
ReadStream::Inner::AsyncOpenStreamOnOwningThread()
{
  MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());

  if (!mControl || mState == Closed) {
    MutexAutoLock lock(mMutex);
    OpenStreamFailed();
    mCondVar.NotifyAll();
    return;
  }

  if (mAsyncOpenStarted) {
    return;
  }
  mAsyncOpenStarted = true;

  RefPtr<ReadStream::Inner> self = this;
  mControl->OpenStream(mId, [self](nsCOMPtr<nsIInputStream>&& aStream) {
    MutexAutoLock lock(self->mMutex);
    self->mAsyncOpenStarted = false;
    if (!self->mStream) {
      if (!aStream) {
        self->OpenStreamFailed();
      } else {
        self->mStream = std::move(aStream);
        self->mSnappyStream = new SnappyUncompressInputStream(self->mStream);
      }
    }
    self->mCondVar.NotifyAll();
  });
}

void
ReadStream::Inner::MaybeAbortAsyncOpenStream()
{
  if (!mAsyncOpenStarted) {
    return;
  }

  MutexAutoLock lock(mMutex);
  OpenStreamFailed();
  mCondVar.NotifyAll();
}

void
ReadStream::Inner::OpenStreamFailed()
{
  MOZ_DIAGNOSTIC_ASSERT(!mStream);
  MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream);
  mMutex.AssertCurrentThreadOwns();
  Unused << NS_NewCStringInputStream(getter_AddRefs(mStream), EmptyCString());
  mSnappyStream = mStream;
  mStream->Close();
  NoteClosed();
}

// ----------------------------------------------------------------------------

NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream);

// static
already_AddRefed<ReadStream>
ReadStream::Create(const CacheReadStreamOrVoid& aReadStreamOrVoid)
{
  if (aReadStreamOrVoid.type() == CacheReadStreamOrVoid::Tvoid_t) {
    return nullptr;
  }

  return Create(aReadStreamOrVoid.get_CacheReadStream());
}

// static
already_AddRefed<ReadStream>
ReadStream::Create(const CacheReadStream& aReadStream)
{
  // The parameter may or may not be for a Cache created stream.  The way we
  // tell is by looking at the stream control actor.  If the actor exists,
  // then we know the Cache created it.
  if (!aReadStream.controlChild() && !aReadStream.controlParent()) {
    return nullptr;
  }

  MOZ_DIAGNOSTIC_ASSERT(aReadStream.stream().type() == OptionalIPCStream::Tvoid_t ||
                        aReadStream.stream().get_IPCStream().type() ==
                        IPCStream::TInputStreamParamsWithFds);

  // Control is guaranteed to survive this method as ActorDestroy() cannot
  // run on this thread until we complete.
  StreamControl* control;
  if (aReadStream.controlChild()) {
    auto actor = static_cast<CacheStreamControlChild*>(aReadStream.controlChild());
    control = actor;
  } else {
    auto actor = static_cast<CacheStreamControlParent*>(aReadStream.controlParent());
    control = actor;
  }
  MOZ_DIAGNOSTIC_ASSERT(control);

  nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream());

  // Currently we expect all cache read streams to be blocking file streams.
#if defined(MOZ_DIAGNOSTIC_ASSERT_ENABLED)
  if (stream) {
    nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream);
    MOZ_DIAGNOSTIC_ASSERT(!asyncStream);
  }
#endif

  RefPtr<Inner> inner = new Inner(control, aReadStream.id(), stream);
  RefPtr<ReadStream> ref = new ReadStream(inner);
  return ref.forget();
}

// static
already_AddRefed<ReadStream>
ReadStream::Create(PCacheStreamControlParent* aControl, const nsID& aId,
                   nsIInputStream* aStream)
{
  MOZ_DIAGNOSTIC_ASSERT(aControl);
  auto actor = static_cast<CacheStreamControlParent*>(aControl);
  RefPtr<Inner> inner = new Inner(actor, aId, aStream);
  RefPtr<ReadStream> ref = new ReadStream(inner);
  return ref.forget();
}

void
ReadStream::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
                      nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                      ErrorResult& aRv)
{
  mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
}

void
ReadStream::Serialize(CacheReadStream* aReadStreamOut,
                      nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
                      ErrorResult& aRv)
{
  mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
}

ReadStream::ReadStream(ReadStream::Inner* aInner)
  : mInner(aInner)
{
  MOZ_DIAGNOSTIC_ASSERT(mInner);
}

ReadStream::~ReadStream()
{
  // Explicitly close the inner stream so that it does not have to
  // deal with implicitly closing at destruction time.
  mInner->Close();
}

NS_IMETHODIMP
ReadStream::Close()
{
  return mInner->Close();
}

NS_IMETHODIMP
ReadStream::Available(uint64_t* aNumAvailableOut)
{
  return mInner->Available(aNumAvailableOut);
}

NS_IMETHODIMP
ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
{
  return mInner->Read(aBuf, aCount, aNumReadOut);
}

NS_IMETHODIMP
ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                         uint32_t aCount, uint32_t* aNumReadOut)
{
  return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
}

NS_IMETHODIMP
ReadStream::IsNonBlocking(bool* aNonBlockingOut)
{
  return mInner->IsNonBlocking(aNonBlockingOut);
}

} // namespace cache
} // namespace dom
} // namespace mozilla