dom/presentation/PresentationTCPSessionTransport.cpp
author Chris Manchester <cmanchester@mozilla.com>
Wed, 25 Oct 2017 15:10:03 -0700
changeset 441546 da833838314c41b320ef43e335ad07c61b47465e
parent 436207 560296786c0cefbc749f54b20ba8afcd79ffd12a
child 441766 cb123c84818a3c491fb9d63f00891d23f515cf6f
permissions -rw-r--r--
Bug 1403346 - Use AC_SUBST_LIST for DSO_CFLAGS and DSO_PIC_CFLAGS r=glandium MozReview-Commit-ID: 7aJ7uOR6tjO

/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "nsArrayUtils.h"
#include "nsIAsyncStreamCopier.h"
#include "nsIInputStreamPump.h"
#include "nsIMultiplexInputStream.h"
#include "nsIMutableArray.h"
#include "nsIOutputStream.h"
#include "nsIPresentationControlChannel.h"
#include "nsIScriptableInputStream.h"
#include "nsISocketTransport.h"
#include "nsISocketTransportService.h"
#include "nsISupportsPrimitives.h"
#include "nsNetUtil.h"
#include "nsQueryObject.h"
#include "nsServiceManagerUtils.h"
#include "nsStreamUtils.h"
#include "nsThreadUtils.h"
#include "PresentationLog.h"
#include "PresentationTCPSessionTransport.h"

#define BUFFER_SIZE 65536

using namespace mozilla;
using namespace mozilla::dom;

class CopierCallbacks final : public nsIRequestObserver
{
public:
  explicit CopierCallbacks(PresentationTCPSessionTransport* aTransport)
    : mOwner(aTransport)
  {}

  NS_DECL_ISUPPORTS
  NS_DECL_NSIREQUESTOBSERVER
private:
  ~CopierCallbacks() {}

  RefPtr<PresentationTCPSessionTransport> mOwner;
};

NS_IMPL_ISUPPORTS(CopierCallbacks, nsIRequestObserver)

NS_IMETHODIMP
CopierCallbacks::OnStartRequest(nsIRequest* aRequest, nsISupports* aContext)
{
  return NS_OK;
}

NS_IMETHODIMP
CopierCallbacks::OnStopRequest(nsIRequest* aRequest, nsISupports* aContext, nsresult aStatus)
{
  mOwner->NotifyCopyComplete(aStatus);
  return NS_OK;
}

NS_IMPL_CYCLE_COLLECTION(PresentationTCPSessionTransport, mTransport,
                         mSocketInputStream, mSocketOutputStream,
                         mInputStreamPump, mInputStreamScriptable,
                         mMultiplexStream, mMultiplexStreamCopier, mCallback)

NS_IMPL_CYCLE_COLLECTING_ADDREF(PresentationTCPSessionTransport)
NS_IMPL_CYCLE_COLLECTING_RELEASE(PresentationTCPSessionTransport)

NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PresentationTCPSessionTransport)
  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIPresentationSessionTransport)
  NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
  NS_INTERFACE_MAP_ENTRY(nsIPresentationSessionTransport)
  NS_INTERFACE_MAP_ENTRY(nsIPresentationSessionTransportBuilder)
  NS_INTERFACE_MAP_ENTRY(nsIPresentationTCPSessionTransportBuilder)
  NS_INTERFACE_MAP_ENTRY(nsIRequestObserver)
  NS_INTERFACE_MAP_ENTRY(nsIStreamListener)
  NS_INTERFACE_MAP_ENTRY(nsITransportEventSink)
NS_INTERFACE_MAP_END

PresentationTCPSessionTransport::PresentationTCPSessionTransport()
  : mReadyState(ReadyState::CLOSED)
  , mAsyncCopierActive(false)
  , mCloseStatus(NS_OK)
  , mDataNotificationEnabled(false)
{
}

PresentationTCPSessionTransport::~PresentationTCPSessionTransport()
{
}

NS_IMETHODIMP
PresentationTCPSessionTransport::BuildTCPSenderTransport(nsISocketTransport* aTransport,
                                                         nsIPresentationSessionTransportBuilderListener* aListener)
{
  if (NS_WARN_IF(!aTransport)) {
    return NS_ERROR_INVALID_ARG;
  }
  mTransport = aTransport;

  if (NS_WARN_IF(!aListener)) {
    return NS_ERROR_INVALID_ARG;
  }
  mListener = aListener;

  nsresult rv = CreateStream();
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  mRole = nsIPresentationService::ROLE_CONTROLLER;

  nsCOMPtr<nsIPresentationSessionTransport> sessionTransport = do_QueryObject(this);
  nsCOMPtr<nsIRunnable> onSessionTransportRunnable =
    NewRunnableMethod<nsIPresentationSessionTransport*>(
      "nsIPresentationSessionTransportBuilderListener::OnSessionTransport",
      mListener,
      &nsIPresentationSessionTransportBuilderListener::OnSessionTransport,
      sessionTransport);

  NS_DispatchToCurrentThread(onSessionTransportRunnable.forget());

  nsCOMPtr<nsIRunnable> setReadyStateRunnable = NewRunnableMethod<ReadyState>(
    "dom::PresentationTCPSessionTransport::SetReadyState",
    this,
    &PresentationTCPSessionTransport::SetReadyState,
    ReadyState::OPEN);
  return NS_DispatchToCurrentThread(setReadyStateRunnable.forget());
}

NS_IMETHODIMP
PresentationTCPSessionTransport::BuildTCPReceiverTransport(nsIPresentationChannelDescription* aDescription,
                                                           nsIPresentationSessionTransportBuilderListener* aListener)
{
  if (NS_WARN_IF(!aDescription)) {
    return NS_ERROR_INVALID_ARG;
  }

  if (NS_WARN_IF(!aListener)) {
    return NS_ERROR_INVALID_ARG;
  }
  mListener = aListener;

  uint16_t serverPort;
  nsresult rv = aDescription->GetTcpPort(&serverPort);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  nsCOMPtr<nsIArray> serverHosts;
  rv = aDescription->GetTcpAddress(getter_AddRefs(serverHosts));
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  // TODO bug 1228504 Take all IP addresses in PresentationChannelDescription
  // into account. And at the first stage Presentation API is only exposed on
  // Firefox OS where the first IP appears enough for most scenarios.
  nsCOMPtr<nsISupportsCString> supportStr = do_QueryElementAt(serverHosts, 0);
  if (NS_WARN_IF(!supportStr)) {
    return NS_ERROR_INVALID_ARG;
  }

  nsAutoCString serverHost;
  supportStr->GetData(serverHost);
  if (serverHost.IsEmpty()) {
    return NS_ERROR_INVALID_ARG;
  }

  PRES_DEBUG("%s:ServerHost[%s],ServerPort[%d]\n", __func__, serverHost.get(), serverPort);

  SetReadyState(ReadyState::CONNECTING);

  nsCOMPtr<nsISocketTransportService> sts =
    do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID);
  if (NS_WARN_IF(!sts)) {
    return NS_ERROR_NOT_AVAILABLE;
  }
  rv = sts->CreateTransport(nullptr, 0, serverHost, serverPort, nullptr,
                            getter_AddRefs(mTransport));
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  nsCOMPtr<nsIEventTarget> mainTarget = GetMainThreadEventTarget();
  mTransport->SetEventSink(this, mainTarget);

  rv = CreateStream();
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  mRole = nsIPresentationService::ROLE_RECEIVER;

  nsCOMPtr<nsIPresentationSessionTransport> sessionTransport = do_QueryObject(this);
  nsCOMPtr<nsIRunnable> runnable =
    NewRunnableMethod<nsIPresentationSessionTransport*>(
      "nsIPresentationSessionTransportBuilderListener::OnSessionTransport",
      mListener,
      &nsIPresentationSessionTransportBuilderListener::OnSessionTransport,
      sessionTransport);
  return NS_DispatchToCurrentThread(runnable.forget());
}

nsresult
PresentationTCPSessionTransport::CreateStream()
{
  nsresult rv = mTransport->OpenInputStream(0, 0, 0, getter_AddRefs(mSocketInputStream));
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }
  rv = mTransport->OpenOutputStream(nsITransport::OPEN_UNBUFFERED, 0, 0, getter_AddRefs(mSocketOutputStream));
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  // If the other side is not listening, we will get an |onInputStreamReady|
  // callback where |available| raises to indicate the connection was refused.
  nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(mSocketInputStream);
  if (NS_WARN_IF(!asyncStream)) {
    return NS_ERROR_NOT_AVAILABLE;
  }

  nsCOMPtr<nsIEventTarget> mainTarget = GetMainThreadEventTarget();
  rv = asyncStream->AsyncWait(this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, mainTarget);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  mInputStreamScriptable = do_CreateInstance("@mozilla.org/scriptableinputstream;1", &rv);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }
  rv = mInputStreamScriptable->Init(mSocketInputStream);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  mMultiplexStream = do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1", &rv);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }
  nsCOMPtr<nsIInputStream> stream = do_QueryInterface(mMultiplexStream);

  mMultiplexStreamCopier = do_CreateInstance("@mozilla.org/network/async-stream-copier;1", &rv);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  nsCOMPtr<nsISocketTransportService> sts =
    do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID);
  if (NS_WARN_IF(!sts)) {
    return NS_ERROR_NOT_AVAILABLE;
  }

  nsCOMPtr<nsIEventTarget> target = do_QueryInterface(sts);
  rv = mMultiplexStreamCopier->Init(stream,
                                    mSocketOutputStream,
                                    target,
                                    true, /* source buffered */
                                    false, /* sink buffered */
                                    BUFFER_SIZE,
                                    false, /* close source */
                                    false); /* close sink */
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  return NS_OK;
}

nsresult
PresentationTCPSessionTransport::CreateInputStreamPump()
{
  if (NS_WARN_IF(mInputStreamPump)) {
    return NS_OK;
  }

  nsresult rv;
  mInputStreamPump = do_CreateInstance(NS_INPUTSTREAMPUMP_CONTRACTID, &rv);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  rv = mInputStreamPump->Init(mSocketInputStream, 0, 0, false, nullptr);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  rv = mInputStreamPump->AsyncRead(this, nullptr);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  return NS_OK;
}

NS_IMETHODIMP
PresentationTCPSessionTransport::EnableDataNotification()
{
  if (NS_WARN_IF(!mCallback)) {
    return NS_ERROR_DOM_INVALID_STATE_ERR;
  }

  if (mDataNotificationEnabled) {
    return NS_OK;
  }

  mDataNotificationEnabled = true;

  if (IsReadyToNotifyData()) {
    return CreateInputStreamPump();
  }

  return NS_OK;
}

// nsIPresentationSessionTransportBuilderListener
NS_IMETHODIMP
PresentationTCPSessionTransport::GetCallback(nsIPresentationSessionTransportCallback** aCallback)
{
  nsCOMPtr<nsIPresentationSessionTransportCallback> callback = mCallback;
  callback.forget(aCallback);
  return NS_OK;
}

NS_IMETHODIMP
PresentationTCPSessionTransport::SetCallback(nsIPresentationSessionTransportCallback* aCallback)
{
  mCallback = aCallback;

  if (!!mCallback && ReadyState::OPEN == mReadyState) {
    // Notify the transport channel is ready.
    Unused << NS_WARN_IF(NS_FAILED(mCallback->NotifyTransportReady()));
  }

  return NS_OK;
}

NS_IMETHODIMP
PresentationTCPSessionTransport::GetSelfAddress(nsINetAddr** aSelfAddress)
{
  if (NS_WARN_IF(!mTransport)) {
    return NS_ERROR_DOM_INVALID_STATE_ERR;
  }

  return mTransport->GetScriptableSelfAddr(aSelfAddress);
}

void
PresentationTCPSessionTransport::EnsureCopying()
{
  if (mAsyncCopierActive) {
    return;
  }

  mAsyncCopierActive = true;
  RefPtr<CopierCallbacks> callbacks = new CopierCallbacks(this);
  Unused << NS_WARN_IF(NS_FAILED(mMultiplexStreamCopier->AsyncCopy(callbacks, nullptr)));
}

void
PresentationTCPSessionTransport::NotifyCopyComplete(nsresult aStatus)
{
  mAsyncCopierActive = false;
  mMultiplexStream->RemoveStream(0);
  if (NS_WARN_IF(NS_FAILED(aStatus))) {
    if (mReadyState != ReadyState::CLOSED) {
      mCloseStatus = aStatus;
      SetReadyState(ReadyState::CLOSED);
    }
    return;
  }

  uint32_t count;
  nsresult rv = mMultiplexStream->GetCount(&count);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return;
  }

  if (count) {
    EnsureCopying();
    return;
  }

  if (mReadyState == ReadyState::CLOSING) {
    mSocketOutputStream->Close();
    mCloseStatus = NS_OK;
    SetReadyState(ReadyState::CLOSED);
  }
}

NS_IMETHODIMP
PresentationTCPSessionTransport::Send(const nsAString& aData)
{
  if (NS_WARN_IF(mReadyState != ReadyState::OPEN)) {
    return NS_ERROR_DOM_INVALID_STATE_ERR;
  }

  nsresult rv;
  nsCOMPtr<nsIStringInputStream> stream =
    do_CreateInstance(NS_STRINGINPUTSTREAM_CONTRACTID, &rv);
  if(NS_WARN_IF(NS_FAILED(rv))) {
    return NS_ERROR_DOM_INVALID_STATE_ERR;
  }

  NS_ConvertUTF16toUTF8 msgString(aData);
  rv = stream->SetData(msgString.BeginReading(), msgString.Length());
  if(NS_WARN_IF(NS_FAILED(rv))) {
    return NS_ERROR_DOM_INVALID_STATE_ERR;
  }

  mMultiplexStream->AppendStream(stream);

  EnsureCopying();

  return NS_OK;
}

NS_IMETHODIMP
PresentationTCPSessionTransport::SendBinaryMsg(const nsACString& aData)
{
  return NS_ERROR_DOM_NOT_SUPPORTED_ERR;
}

NS_IMETHODIMP
PresentationTCPSessionTransport::SendBlob(nsIDOMBlob* aBlob)
{
  return NS_ERROR_DOM_NOT_SUPPORTED_ERR;
}

NS_IMETHODIMP
PresentationTCPSessionTransport::Close(nsresult aReason)
{
  PRES_DEBUG("%s:reason[%" PRIx32 "]\n", __func__, static_cast<uint32_t>(aReason));

  if (mReadyState == ReadyState::CLOSED || mReadyState == ReadyState::CLOSING) {
    return NS_OK;
  }

  mCloseStatus = aReason;
  SetReadyState(ReadyState::CLOSING);

  uint32_t count = 0;
  mMultiplexStream->GetCount(&count);
  if (!count) {
    mSocketOutputStream->Close();
  }

  mSocketInputStream->Close();
  mDataNotificationEnabled = false;

  mListener = nullptr;

  return NS_OK;
}

void
PresentationTCPSessionTransport::SetReadyState(ReadyState aReadyState)
{
  mReadyState = aReadyState;

  if (mReadyState == ReadyState::OPEN) {
    if (IsReadyToNotifyData()) {
      CreateInputStreamPump();
    }

    if (NS_WARN_IF(!mCallback)) {
      return;
    }

    // Notify the transport channel is ready.
    Unused << NS_WARN_IF(NS_FAILED(mCallback->NotifyTransportReady()));
  } else if (mReadyState == ReadyState::CLOSED && mCallback) {
    if (NS_WARN_IF(!mCallback)) {
      return;
    }

    // Notify the transport channel has been shut down.
    Unused <<
      NS_WARN_IF(NS_FAILED(mCallback->NotifyTransportClosed(mCloseStatus)));
    mCallback = nullptr;
  }
}

// nsITransportEventSink
NS_IMETHODIMP
PresentationTCPSessionTransport::OnTransportStatus(nsITransport* aTransport,
                                                   nsresult aStatus,
                                                   int64_t aProgress,
                                                   int64_t aProgressMax)
{
  PRES_DEBUG("%s:aStatus[%" PRIx32 "]\n", __func__, static_cast<uint32_t>(aStatus));

  MOZ_ASSERT(NS_IsMainThread());

  if (aStatus != NS_NET_STATUS_CONNECTED_TO) {
    return NS_OK;
  }

  SetReadyState(ReadyState::OPEN);

  return NS_OK;
}

// nsIInputStreamCallback
NS_IMETHODIMP
PresentationTCPSessionTransport::OnInputStreamReady(nsIAsyncInputStream* aStream)
{
  MOZ_ASSERT(NS_IsMainThread());

  // Only used for detecting if the connection was refused.
  uint64_t dummy;
  nsresult rv = aStream->Available(&dummy);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    if (mReadyState != ReadyState::CLOSED) {
      mCloseStatus = NS_ERROR_CONNECTION_REFUSED;
      SetReadyState(ReadyState::CLOSED);
    }
  }

  return NS_OK;
}

// nsIRequestObserver
NS_IMETHODIMP
PresentationTCPSessionTransport::OnStartRequest(nsIRequest* aRequest,
                                                nsISupports* aContext)
{
  // Do nothing.
  return NS_OK;
}

NS_IMETHODIMP
PresentationTCPSessionTransport::OnStopRequest(nsIRequest* aRequest,
                                               nsISupports* aContext,
                                               nsresult aStatusCode)
{
  PRES_DEBUG("%s:aStatusCode[%" PRIx32 "]\n", __func__, static_cast<uint32_t>(aStatusCode));

  MOZ_ASSERT(NS_IsMainThread());

  uint32_t count;
  nsresult rv = mMultiplexStream->GetCount(&count);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  mInputStreamPump = nullptr;

  if (count != 0 && NS_SUCCEEDED(aStatusCode)) {
    // If we have some buffered output still, and status is not an error, the
    // other side has done a half-close, but we don't want to be in the close
    // state until we are done sending everything that was buffered. We also
    // don't want to call |NotifyTransportClosed| yet.
    return NS_OK;
  }

  // We call this even if there is no error.
  if (mReadyState != ReadyState::CLOSED) {
    mCloseStatus = aStatusCode;
    SetReadyState(ReadyState::CLOSED);
  }
  return NS_OK;
}

// nsIStreamListener
NS_IMETHODIMP
PresentationTCPSessionTransport::OnDataAvailable(nsIRequest* aRequest,
                                                 nsISupports* aContext,
                                                 nsIInputStream* aStream,
                                                 uint64_t aOffset,
                                                 uint32_t aCount)
{
  MOZ_ASSERT(NS_IsMainThread());

  if (NS_WARN_IF(!mCallback)) {
    return NS_ERROR_NOT_AVAILABLE;
  }

  nsCString data;
  nsresult rv = mInputStreamScriptable->ReadBytes(aCount, data);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  // Pass the incoming data to the listener.
  return mCallback->NotifyData(data, false);
}