media/mtransport/transportflow.cpp
author Mike Hommey <mh+mozilla@glandium.org>
Thu, 11 May 2017 13:23:07 +0900
changeset 358569 faae5fbdd01ddaf171810c446a645ea00085aa54
parent 342628 dcd3b1f4bc69dcd481f53ff84de46bb6702f936c
child 401421 2a59e9ad8ba07aa2bd0e9862b80735f0f91b4273
permissions -rw-r--r--
Bug 1363992 - Remove jemalloc 4. r=njn

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
 * You can obtain one at http://mozilla.org/MPL/2.0/. */

// Original author: ekr@rtfm.com
#include <deque>

#include "logging.h"
#include "runnable_utils.h"
#include "transportflow.h"
#include "transportlayer.h"

namespace mozilla {

MOZ_MTLOG_MODULE("mtransport")

NS_IMPL_ISUPPORTS0(TransportFlow)

// There are some hacks here to allow destruction off of
// the main thread.
TransportFlow::~TransportFlow() {
  // Make sure that if we are off the right thread, we have
  // no more attached signals.
  if (!CheckThreadInt()) {
    MOZ_ASSERT(SignalStateChange.is_empty());
    MOZ_ASSERT(SignalPacketReceived.is_empty());
  }

  // Push the destruction onto the STS thread. Note that there
  // is still some possibility that someone is accessing this
  // object simultaneously, but as long as smart pointer discipline
  // is maintained, it shouldn't be possible to access and
  // destroy it simultaneously. The conversion to an nsAutoPtr
  // ensures automatic destruction of the queue at exit of
  // DestroyFinal.
  nsAutoPtr<std::deque<TransportLayer*>> layers_tmp(layers_.release());
  RUN_ON_THREAD(target_,
                WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp),
                NS_DISPATCH_NORMAL);
}

void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) {
  ClearLayers(layers.get());
}

void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
  while (!layers->empty()) {
    delete layers->front();
    layers->pop();
  }
}

void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
  while (!layers->empty()) {
    delete layers->front();
    layers->pop_front();
  }
}

nsresult TransportFlow::PushLayer(TransportLayer *layer) {
  CheckThread();
  UniquePtr<TransportLayer> layer_tmp(layer);  // Destroy on failure.

  // Don't allow pushes once we are in error state.
  if (state_ == TransportLayer::TS_ERROR) {
    MOZ_MTLOG(ML_ERROR, id_ + ": Can't call PushLayer in error state for flow");
    return NS_ERROR_FAILURE;
  }

  nsresult rv = layer->Init();
  if (!NS_SUCCEEDED(rv)) {
    // Destroy the rest of the flow, because it's no longer in an acceptable
    // state.
    ClearLayers(layers_.get());

    // Set ourselves to have failed.
    MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed; invalidating");
    StateChangeInt(TransportLayer::TS_ERROR);

    return rv;
  }
  EnsureSameThread(layer);

  TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();

  // Re-target my signals to the new layer
  if (old_layer) {
    old_layer->SignalStateChange.disconnect(this);
    old_layer->SignalPacketReceived.disconnect(this);
  }
  layers_->push_front(layer_tmp.release());
  layer->Inserted(this, old_layer);

  layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
  layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
  StateChangeInt(layer->state());

  return NS_OK;
}

// This is all-or-nothing.
nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
  CheckThread();

  MOZ_ASSERT(!layers->empty());
  if (layers->empty()) {
    MOZ_MTLOG(ML_ERROR, id_ << ": Can't call PushLayers with empty layers");
    return NS_ERROR_INVALID_ARG;
  }

  // Don't allow pushes once we are in error state.
  if (state_ == TransportLayer::TS_ERROR) {
    MOZ_MTLOG(ML_ERROR,
              id_ << ": Can't call PushLayers in error state for flow ");
    ClearLayers(layers.get());
    return NS_ERROR_FAILURE;
  }

  nsresult rv = NS_OK;

  // Disconnect all the old signals.
  disconnect_all();

  TransportLayer *layer;

  while (!layers->empty()) {
    TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
    layer = layers->front();

    rv = layer->Init();
    if (NS_FAILED(rv)) {
      MOZ_MTLOG(ML_ERROR,
                id_ << ": Layer initialization failed; invalidating flow ");
      break;
    }

    EnsureSameThread(layer);

    // Push the layer onto the queue.
    layers_->push_front(layer);
    layers->pop();
    layer->Inserted(this, old_layer);
  }

  if (NS_FAILED(rv)) {
    // Destroy any layers we could not push.
    ClearLayers(layers.get());

    // Now destroy the rest of the flow, because it's no longer
    // in an acceptable state.
    ClearLayers(layers_.get());

    // Set ourselves to have failed.
    StateChangeInt(TransportLayer::TS_ERROR);

    // Return failure.
    return rv;
  }

  // Finally, attach ourselves to the top layer.
  layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
  layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
  StateChangeInt(layer->state());  // Signals if the state changes.

  return NS_OK;
}

TransportLayer *TransportFlow::top() const {
  CheckThread();

  return layers_->empty() ? nullptr : layers_->front();
}

TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
  CheckThread();

  for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
       it != layers_->end(); ++it) {
    if ((*it)->id() == id)
      return *it;
  }

  return nullptr;
}

TransportLayer::State TransportFlow::state() {
  CheckThread();

  return state_;
}

TransportResult TransportFlow::SendPacket(const unsigned char *data,
                                          size_t len) {
  CheckThread();

  if (state_ != TransportLayer::TS_OPEN) {
    return TE_ERROR;
  }
  return top() ? top()->SendPacket(data, len) : TE_ERROR;
}

bool TransportFlow::Contains(TransportLayer *layer) const {
  if (layers_) {
    for (auto& l : *layers_) {
      if (l == layer) {
        return true;
      }
    }
  }
  return false;
}

void TransportFlow::EnsureSameThread(TransportLayer *layer)  {
  // Enforce that if any of the layers have a thread binding,
  // they all have the same binding.
  if (target_) {
    const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();

    if (lthread && (lthread != target_))
      MOZ_CRASH();
  }
  else {
    target_ = layer->GetThread();
  }
}

void TransportFlow::StateChangeInt(TransportLayer::State state) {
  CheckThread();

  if (state == state_) {
    return;
  }

  state_ = state;
  SignalStateChange(this, state_);
}

void TransportFlow::StateChange(TransportLayer *layer,
                                TransportLayer::State state) {
  CheckThread();

  StateChangeInt(state);
}

void TransportFlow::PacketReceived(TransportLayer* layer,
                                   const unsigned char *data,
                                   size_t len) {
  CheckThread();

  SignalPacketReceived(this, data, len);
}

}  // close namespace