new file mode 100644
--- /dev/null
+++ b/netwerk/sctp/datachannel/DataChannel.cpp
@@ -0,0 +1,1973 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <iostream>
+#if !defined(__Userspace_os_Windows)
+#include <arpa/inet.h>
+#endif
+
+#define SCTP_DEBUG 1
+#define SCTP_STDINT_INCLUDE "mozilla/StandardInteger.h"
+#include "usrsctp.h"
+
+#include "DataChannelLog.h"
+
+#include "nsServiceManagerUtils.h"
+#include "nsIObserverService.h"
+#include "nsIObserver.h"
+#include "mozilla/Services.h"
+#include "nsThreadUtils.h"
+#include "nsAutoPtr.h"
+#include "nsNetUtil.h"
+#ifdef MOZ_PEERCONNECTION
+#include "mtransport/runnable_utils.h"
+#endif
+#include "DataChannel.h"
+#include "DataChannelProtocol.h"
+
+#ifdef PR_LOGGING
+PRLogModuleInfo* dataChannelLog = PR_NewLogModule("DataChannel");
+#endif
+
+static bool sctp_initialized;
+
+namespace mozilla {
+
+class DataChannelShutdown;
+nsCOMPtr<DataChannelShutdown> gDataChannelShutdown;
+
+class DataChannelShutdown : public nsIObserver
+{
+public:
+ // This needs to be tied to some form object that is guaranteed to be
+ // around (singleton likely) unless we want to shutdown sctp whenever
+ // we're not using it (and in which case we'd keep a refcnt'd object
+ // ref'd by each DataChannelConnection to release the SCTP usrlib via
+ // sctp_finish)
+
+ NS_DECL_ISUPPORTS
+
+ DataChannelShutdown() {}
+
+ void Init()
+ {
+ nsCOMPtr<nsIObserverService> observerService =
+ mozilla::services::GetObserverService();
+ if (!observerService)
+ return;
+
+ nsresult rv = observerService->AddObserver(this,
+ "profile-change-net-teardown",
+ false);
+ MOZ_ASSERT(rv == NS_OK);
+ (void) rv;
+ }
+
+ virtual ~DataChannelShutdown()
+ {
+ nsCOMPtr<nsIObserverService> observerService =
+ mozilla::services::GetObserverService();
+ if (observerService)
+ observerService->RemoveObserver(this, "profile-change-net-teardown");
+ }
+
+ NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic,
+ const PRUnichar* aData) {
+ if (strcmp(aTopic, "profile-change-net-teardown") == 0) {
+ LOG(("Shutting down SCTP"));
+ if (sctp_initialized) {
+ usrsctp_finish();
+ sctp_initialized = false;
+ }
+ }
+ return NS_OK;
+ }
+};
+
+NS_IMPL_ISUPPORTS1(DataChannelShutdown, nsIObserver);
+
+
+BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
+ uint32_t length) : mLength(length)
+{
+ mSpa = new sctp_sendv_spa;
+ *mSpa = spa;
+ char *tmp = new char[length]; // infallible malloc!
+ memcpy(tmp, data, length);
+ mData = tmp;
+}
+
+BufferedMsg::~BufferedMsg()
+{
+ delete mSpa;
+ delete mData;
+}
+
+static int
+receive_cb(struct socket* sock, union sctp_sockstore addr,
+ void *data, size_t datalen,
+ struct sctp_rcvinfo rcv, int flags, void *ulp_info)
+{
+ DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
+ return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
+}
+
+DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
+ mLock("netwerk::sctp::DataChannel")
+{
+ mState = CLOSED;
+ mSocket = nullptr;
+ mMasterSocket = nullptr;
+ mListener = listener;
+ mLocalPort = 0;
+ mRemotePort = 0;
+ mDeferTimeout = 10;
+ mTimerRunning = false;
+ LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener));
+}
+
+DataChannelConnection::~DataChannelConnection()
+{
+ // XXX Move CloseAll() to a Destroy() call
+ // Though it's probably ok to do this and close the sockets;
+ // if we really want it to do true clean shutdowns it can
+ // create a dependant Internal object that would remain around
+ // until the network shut down the association or timed out.
+ CloseAll();
+ if (mSocket && mSocket != mMasterSocket)
+ usrsctp_close(mSocket);
+ if (mMasterSocket)
+ usrsctp_close(mMasterSocket);
+}
+
+NS_IMPL_THREADSAFE_ISUPPORTS1(DataChannelConnection,
+ nsITimerCallback)
+
+bool
+DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
+{
+ struct sctp_initmsg initmsg;
+ struct sctp_udpencaps encaps;
+ struct sctp_assoc_value av;
+ struct sctp_event event;
+ socklen_t len;
+
+ uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
+ SCTP_PEER_ADDR_CHANGE,
+ SCTP_REMOTE_ERROR,
+ SCTP_SHUTDOWN_EVENT,
+ SCTP_ADAPTATION_INDICATION,
+ SCTP_SEND_FAILED_EVENT,
+ SCTP_STREAM_RESET_EVENT,
+ SCTP_STREAM_CHANGE_EVENT};
+ {
+ MOZ_ASSERT(NS_IsMainThread());
+
+ // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
+ if (!sctp_initialized) {
+ if (aUsingDtls) {
+ LOG(("sctp_init(DTLS)"));
+#ifdef MOZ_PEERCONNECTION
+ usrsctp_init(0, DataChannelConnection::SctpDtlsOutput);
+#else
+ NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
+#endif
+ } else {
+ LOG(("sctp_init(%d)", aPort));
+ usrsctp_init(aPort, nullptr);
+ }
+
+ usrsctp_sysctl_set_sctp_debug_on(0 /* SCTP_DEBUG_ALL */);
+ usrsctp_sysctl_set_sctp_blackhole(2);
+ sctp_initialized = true;
+
+ gDataChannelShutdown = new DataChannelShutdown();
+ gDataChannelShutdown->Init();
+ }
+ }
+
+ // Open sctp association across tunnel
+ if ((mMasterSocket = usrsctp_socket(
+ aUsingDtls ? AF_CONN : AF_INET,
+ SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) {
+ return false;
+ }
+
+ if (!aUsingDtls) {
+ memset(&encaps, 0, sizeof(encaps));
+ encaps.sue_address.ss_family = AF_INET;
+ encaps.sue_port = htons(aPort);
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
+ (const void*)&encaps,
+ (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
+ LOG(("*** failed encaps errno %d", errno));
+ goto error_cleanup;
+ }
+ LOG(("SCTP encapsulation local port %d", aPort));
+ }
+
+ av.assoc_id = SCTP_ALL_ASSOC;
+ av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
+ (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
+ LOG(("*** failed enable stream reset errno %d", errno));
+ goto error_cleanup;
+ }
+
+ /* Enable the events of interest. */
+ memset(&event, 0, sizeof(event));
+ event.se_assoc_id = SCTP_ALL_ASSOC;
+ event.se_on = 1;
+ for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) {
+ event.se_type = event_types[i];
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
+ LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
+ goto error_cleanup;
+ }
+ }
+
+ // Update number of streams
+ mStreamsOut.AppendElements(aNumStreams);
+ mStreamsIn.AppendElements(aNumStreams); // make sure both are the same length
+ for (uint32_t i = 0; i < aNumStreams; ++i) {
+ mStreamsOut[i] = nullptr;
+ mStreamsIn[i] = nullptr;
+ }
+ memset(&initmsg, 0, sizeof(initmsg));
+ len = sizeof(initmsg);
+ if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
+ LOG(("*** failed getsockopt SCTP_INITMSG"));
+ goto error_cleanup;
+ }
+ LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
+ initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
+ initmsg.sinit_num_ostreams = aNumStreams;
+ initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+ (socklen_t)sizeof(initmsg)) < 0) {
+ LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
+ goto error_cleanup;
+ }
+
+ mSocket = nullptr;
+ return true;
+
+error_cleanup:
+ usrsctp_close(mMasterSocket);
+ mMasterSocket = nullptr;
+ return false;
+}
+
+void
+DataChannelConnection::StartDefer()
+{
+ nsresult rv;
+ if (!NS_IsMainThread()) {
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::START_DEFER,
+ this, nullptr));
+ return;
+ }
+
+ MOZ_ASSERT(NS_IsMainThread());
+ if (!mDeferredTimer) {
+ mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
+ MOZ_ASSERT(mDeferredTimer);
+ }
+
+ if (!mTimerRunning) {
+ rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
+ nsITimer::TYPE_ONE_SHOT);
+ NS_ENSURE_TRUE(rv == NS_OK, /* */);
+
+ mTimerRunning = true;
+ }
+}
+
+// nsITimerCallback
+
+NS_IMETHODIMP
+DataChannelConnection::Notify(nsITimer *timer)
+{
+ MOZ_ASSERT(NS_IsMainThread());
+ LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout));
+
+ if (timer == mDeferredTimer) {
+ if (SendDeferredMessages()) {
+ // Still blocked
+ // we don't need a lock, since this must be main thread...
+ nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
+ nsITimer::TYPE_ONE_SHOT);
+ if (NS_FAILED(rv)) {
+ LOG(("%s: cannot initialize open timer", __FUNCTION__));
+ // XXX and do....?
+ return rv;
+ }
+ mTimerRunning = true;
+ } else {
+ LOG(("Turned off deferred send timer"));
+ mTimerRunning = false;
+ }
+ }
+ return NS_OK;
+}
+
+#ifdef MOZ_PEERCONNECTION
+bool
+DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
+{
+ LOG(("Connect DTLS local %d, remote %d", localport, remoteport));
+
+ NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectDTLS!");
+ NS_ENSURE_TRUE(aFlow, false);
+
+ mTransportFlow = aFlow;
+ mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::PacketReceived);
+ mLocalPort = localport;
+ mRemotePort = remoteport;
+
+ PR_CreateThread(
+ PR_SYSTEM_THREAD,
+ DataChannelConnection::DTLSConnectThread, this,
+ PR_PRIORITY_NORMAL,
+ PR_GLOBAL_THREAD,
+ PR_JOINABLE_THREAD, 0
+ );
+
+ return true; // not finished yet
+}
+
+/* static */
+void
+DataChannelConnection::DTLSConnectThread(void *data)
+{
+ DataChannelConnection *_this = static_cast<DataChannelConnection*>(data);
+ struct sockaddr_conn addr;
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sconn_family = AF_CONN;
+#if !defined(__Userspace_os_Linux) && !defined(__Userspace_os_Windows)
+ addr.sconn_len = sizeof(addr);
+#endif
+ addr.sconn_port = htons(_this->mLocalPort);
+
+ int r = usrsctp_bind(_this->mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
+ sizeof(addr));
+ if (r < 0) {
+ LOG(("usrsctp_bind failed: %d", r));
+ return;
+ }
+
+ // This is the remote addr
+ addr.sconn_port = htons(_this->mRemotePort);
+ addr.sconn_addr = static_cast<void *>(_this);
+ r = usrsctp_connect(_this->mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
+ sizeof(addr));
+ if (r < 0) {
+ LOG(("usrsctp_connect failed: %d", r));
+ return;
+ }
+
+ // Notify Connection open
+ LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, _this));
+ _this->mSocket = _this->mMasterSocket;
+ _this->mState = OPEN;
+ LOG(("DTLS connect() succeeded! Entering connected mode"));
+
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CONNECTION,
+ _this, nullptr));
+}
+
+void
+DataChannelConnection::PacketReceived(TransportFlow *flow,
+ const unsigned char *data, size_t len)
+{
+ //LOG(("%p: SCTP/DTLS received %ld bytes", this, len));
+
+ // Pass the data to SCTP
+ usrsctp_conninput(static_cast<void *>(this), data, len, 0);
+}
+
+// XXX Merge with SctpDtlsOutput?
+int
+DataChannelConnection::SendPacket(const unsigned char *data, size_t len)
+{
+ //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
+ return mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
+}
+
+/* static */
+int
+DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
+ uint8_t tos, uint8_t set_df)
+{
+ DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
+
+ return peer->SendPacket(static_cast<unsigned char *>(buffer), length);
+}
+#endif
+
+// listen for incoming associations
+// Blocks! - Don't call this from main thread!
+bool
+DataChannelConnection::Listen(unsigned short port)
+{
+ struct sockaddr_in addr;
+ socklen_t addr_len;
+
+ NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
+
+ /* Acting as the 'server' */
+ memset((void *)&addr, 0, sizeof(addr));
+#ifdef HAVE_SIN_LEN
+ addr.sin_len = sizeof(struct sockaddr_in);
+#endif
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ LOG(("Waiting for connections on port %d", ntohs(addr.sin_port)));
+ mState = CONNECTING;
+ if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
+ LOG(("***Failed userspace_bind"));
+ return false;
+ }
+ if (usrsctp_listen(mMasterSocket, 1) < 0) {
+ LOG(("***Failed userspace_listen"));
+ return false;
+ }
+
+ LOG(("Accepting connection"));
+ addr_len = 0;
+ if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
+ LOG(("***Failed accept"));
+ return false;
+ }
+ mState = OPEN;
+
+ // Notify Connection open
+ // XXX We need to make sure connection sticks around until the message is delivered
+ LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CONNECTION,
+ this, nullptr));
+ return true;
+}
+
+// Blocks! - Don't call this from main thread!
+bool
+DataChannelConnection::Connect(const char *addr, unsigned short port)
+{
+ struct sockaddr_in addr4;
+ struct sockaddr_in6 addr6;
+
+ NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
+
+ /* Acting as the connector */
+ LOG(("Connecting to %s, port %u", addr, port));
+ memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
+ memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
+#ifdef HAVE_SIN_LEN
+ addr4.sin_len = sizeof(struct sockaddr_in);
+#endif
+#ifdef HAVE_SIN6_LEN
+ addr6.sin6_len = sizeof(struct sockaddr_in6);
+#endif
+ addr4.sin_family = AF_INET;
+ addr6.sin6_family = AF_INET6;
+ addr4.sin_port = htons(port);
+ addr6.sin6_port = htons(port);
+ mState = CONNECTING;
+
+#if !defined(__Userspace_os_Windows)
+ if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else {
+ LOG(("*** Illegal destination address."));
+ }
+#else
+ {
+ struct sockaddr_storage ss;
+ int sslen = sizeof(ss);
+
+ if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
+ addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
+ addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
+ if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
+ LOG(("*** Failed userspace_connect"));
+ return false;
+ }
+ } else {
+ LOG(("*** Illegal destination address."));
+ }
+ }
+#endif
+
+ mSocket = mMasterSocket;
+
+ LOG(("connect() succeeded! Entering connected mode"));
+ mState = OPEN;
+
+ // Notify Connection open
+ // XXX We need to make sure connection sticks around until the message is delivered
+ LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CONNECTION,
+ this, nullptr));
+ return true;
+}
+
+DataChannel *
+DataChannelConnection::FindChannelByStreamIn(uint16_t streamIn)
+{
+ // Auto-extend mStreamsIn as needed
+ if (((uint32_t) streamIn) + 1 > mStreamsIn.Length()) {
+ uint32_t old_len = mStreamsIn.Length();
+ LOG(("Extending mStreamsIn[] to %d elements", ((int32_t) streamIn)+1));
+ mStreamsIn.AppendElements((streamIn+1) - mStreamsIn.Length());
+ for (uint32_t i = old_len; i < mStreamsIn.Length(); ++i)
+ mStreamsIn[i] = nullptr;
+ }
+ // Should always be safe in practice
+ return mStreamsIn.SafeElementAt(streamIn);
+}
+
+DataChannel *
+DataChannelConnection::FindChannelByStreamOut(uint16_t streamOut)
+{
+ return mStreamsOut.SafeElementAt(streamOut);
+}
+
+uint16_t
+DataChannelConnection::FindFreeStreamOut()
+{
+ uint32_t i, limit;
+
+ limit = mStreamsOut.Length();
+ if (limit > MAX_NUM_STREAMS)
+ limit = MAX_NUM_STREAMS;
+ for (i = 0; i < limit; ++i) {
+ if (!mStreamsOut[i]) {
+ break;
+ }
+ }
+ if (i == limit) {
+ return INVALID_STREAM;
+ }
+ return i;
+}
+
+bool
+DataChannelConnection::RequestMoreStreamsOut(int32_t aNeeded)
+{
+ struct sctp_status status;
+ struct sctp_add_streams sas;
+ uint32_t outStreamsNeeded;
+ socklen_t len;
+
+ if (aNeeded + mStreamsOut.Length() > MAX_NUM_STREAMS)
+ aNeeded = MAX_NUM_STREAMS - mStreamsOut.Length();
+ if (aNeeded <= 0)
+ return false;
+
+ len = (socklen_t)sizeof(struct sctp_status);
+ if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
+ LOG(("***failed: getsockopt SCTP_STATUS"));
+ return false;
+ }
+ outStreamsNeeded = aNeeded; // number to add
+
+ memset(&sas, 0, sizeof(struct sctp_add_streams));
+ sas.sas_instrms = 0;
+ sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
+ // Doesn't block, we get an event when it succeeds or fails
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
+ (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
+ if (errno == EALREADY)
+ return true;
+
+ LOG(("***failed: setsockopt ADD errno=%d", errno));
+ return false;
+ }
+ LOG(("Requested %u more streams", outStreamsNeeded));
+ return true;
+}
+
+int32_t
+DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t streamOut)
+{
+ struct sctp_sndinfo sndinfo;
+
+ // Note: Main-thread IO, but doesn't block
+ memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
+ sndinfo.snd_sid = streamOut;
+ sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
+ if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
+ &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
+ SCTP_SENDV_SNDINFO, 0) < 0) {
+ //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
+ return (0);
+ }
+ return (1);
+}
+
+int32_t
+DataChannelConnection::SendOpenResponseMessage(uint16_t streamOut, uint16_t streamIn)
+{
+ struct rtcweb_datachannel_open_response rsp;
+
+ memset(&rsp, 0, sizeof(struct rtcweb_datachannel_open_response));
+ rsp.msg_type = DATA_CHANNEL_OPEN_RESPONSE;
+ rsp.reverse_stream = htons(streamIn);
+
+ return SendControlMessage(&rsp, sizeof(rsp), streamOut);
+}
+
+
+int32_t
+DataChannelConnection::SendOpenAckMessage(uint16_t streamOut)
+{
+ struct rtcweb_datachannel_ack ack;
+
+ memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
+ ack.msg_type = DATA_CHANNEL_ACK;
+
+ return SendControlMessage(&ack, sizeof(ack), streamOut);
+}
+
+int32_t
+DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
+ uint16_t streamOut, bool unordered,
+ uint16_t prPolicy, uint32_t prValue)
+{
+ int len = label.Length(); // not including nul
+ struct rtcweb_datachannel_open_request *req =
+ (struct rtcweb_datachannel_open_request*) moz_xmalloc(sizeof(*req)+len);
+ // careful - ok because request includes 1 char label
+
+ memset(req, 0, sizeof(struct rtcweb_datachannel_open_request));
+ req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
+ switch (prPolicy) {
+ case SCTP_PR_SCTP_NONE:
+ req->channel_type = DATA_CHANNEL_RELIABLE;
+ break;
+ case SCTP_PR_SCTP_TTL:
+ req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
+ break;
+ case SCTP_PR_SCTP_RTX:
+ req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
+ break;
+ default:
+ // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
+ moz_free(req);
+ return (0);
+ }
+ req->flags = htons(0);
+ if (unordered) {
+ req->flags |= htons(DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED);
+ }
+ req->reliability_params = htons((uint16_t)prValue); /* XXX Why 16-bit */
+ req->priority = htons(0); /* XXX: add support */
+ strcpy(&req->label[0], PromiseFlatCString(label).get());
+
+ int32_t result = SendControlMessage(req, sizeof(*req)+len, streamOut);
+
+ moz_free(req);
+ return result;
+}
+
+// XXX This should use a separate thread (outbound queue) which should
+// select() to know when to *try* to send data to the socket again.
+// Alternatively, it can use a timeout, but that's guaranteed to be wrong
+// (just not sure in what direction). We could re-implement NSPR's
+// PR_POLL_WRITE/etc handling... with a lot of work.
+
+// returns if we're still blocked or not
+bool
+DataChannelConnection::SendDeferredMessages()
+{
+ uint32_t i;
+ DataChannel *channel;
+ bool still_blocked = false;
+ bool sent = false;
+
+ // This may block while something is modifying channels, but should not block for IO
+ MutexAutoLock lock(mLock);
+
+ // XXX For total fairness, on a still_blocked we'd start next time at the
+ // same index. Sorry, not going to bother for now.
+ for (i = 0; i < mStreamsOut.Length(); ++i) {
+ channel = mStreamsOut[i];
+ if (!channel)
+ continue;
+
+ // Only one of these should be set....
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
+ if (SendOpenRequestMessage(channel->mLabel, channel->mStreamOut,
+ channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED,
+ channel->mPrPolicy, channel->mPrValue)) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
+ sent = true;
+ } else {
+ if (errno == EAGAIN) {
+ still_blocked = true;
+ } else {
+ // Close the channel, inform the user
+ mStreamsOut[channel->mStreamOut] = nullptr;
+ channel->mState = CLOSED;
+ // Don't need to reset; we didn't open it
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel));
+ }
+ }
+ }
+ if (still_blocked)
+ break;
+
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_RSP) {
+ if (SendOpenResponseMessage(channel->mStreamOut, channel->mStreamIn)) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_RSP;
+ sent = true;
+ } else {
+ if (errno == EAGAIN) {
+ still_blocked = true;
+ } else {
+ // Close the channel
+ // Don't need to reset; we didn't open it
+ // The other side may be left with a hanging Open. Our inability to
+ // send the open response means we can't easily tell them about it
+ // We haven't informed the user/DOM of the creation yet, so just
+ // delete the channel.
+ mStreamsIn[channel->mStreamIn] = nullptr;
+ mStreamsOut[channel->mStreamOut] = nullptr;
+ delete channel;
+ }
+ }
+ }
+ if (still_blocked)
+ break;
+
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
+ if (SendOpenAckMessage(channel->mStreamOut)) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
+ sent = true;
+ } else {
+ if (errno == EAGAIN) {
+ still_blocked = true;
+ } else {
+ // Close the channel, inform the user
+ Close(channel->mStreamOut);
+ }
+ }
+ }
+ if (still_blocked)
+ break;
+
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
+ bool failed_send = false;
+ int32_t result;
+
+ if (channel->mState == CLOSED || channel->mState == CLOSING) {
+ channel->mBufferedData.Clear();
+ }
+ while (!channel->mBufferedData.IsEmpty() &&
+ !failed_send) {
+ struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
+ const char *data = channel->mBufferedData[0]->mData;
+ uint32_t len = channel->mBufferedData[0]->mLength;
+
+ // SCTP will return EMSGSIZE if the message is bigger than the buffer
+ // size (or EAGAIN if there isn't space)
+ if ((result = usrsctp_sendv(mSocket, data, len,
+ nullptr, 0,
+ (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
+ SCTP_SENDV_SPA,
+ spa->sendv_sndinfo.snd_flags) < 0)) {
+ if (errno == EAGAIN) {
+ // leave queued for resend
+ failed_send = true;
+ LOG(("queue full again when resending %d bytes (%d)", len, result));
+ } else {
+ LOG(("error %d re-sending string", errno));
+ failed_send = true;
+ }
+ } else {
+ LOG(("Resent buffer of %d bytes (%d)", len, result));
+ sent = true;
+ channel->mBufferedData.RemoveElementAt(0);
+ }
+ }
+ if (channel->mBufferedData.IsEmpty())
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
+ else
+ still_blocked = true;
+ }
+ if (still_blocked)
+ break;
+ }
+
+ if (!still_blocked) {
+ // mDeferTimeout becomes an estimate of how long we need to wait next time we block
+ return false;
+ }
+ // adjust time? More time for next wait if we didn't send anything, less if did
+ // Pretty crude, but better than nothing; just to keep CPU use down
+ if (!sent && mDeferTimeout < 50)
+ mDeferTimeout++;
+ else if (sent && mDeferTimeout > 10)
+ mDeferTimeout--;
+
+ return true;
+}
+
+void
+DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
+ size_t length,
+ uint16_t streamIn)
+{
+ DataChannel *channel;
+ uint32_t prValue;
+ uint16_t prPolicy;
+ uint32_t flags;
+ nsCString label(nsDependentCString(req->label));
+
+ mLock.AssertCurrentThreadOwns();
+
+ if ((channel = FindChannelByStreamIn(streamIn))) {
+ LOG(("ERROR: HandleOpenRequestMessage: channel for stream %d is in state %d instead of CLOSED.",
+ streamIn, channel->mState));
+ /* XXX: some error handling */
+ return;
+ }
+ switch (req->channel_type) {
+ case DATA_CHANNEL_RELIABLE:
+ prPolicy = SCTP_PR_SCTP_NONE;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
+ prPolicy = SCTP_PR_SCTP_RTX;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
+ prPolicy = SCTP_PR_SCTP_TTL;
+ break;
+ default:
+ /* XXX error handling */
+ return;
+ }
+ prValue = ntohs(req->reliability_params);
+ flags = ntohs(req->flags) & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED;
+ channel = new DataChannel(this, INVALID_STREAM, streamIn,
+ DataChannel::CONNECTING,
+ label,
+ prPolicy, prValue,
+ flags,
+ nullptr, nullptr);
+ mStreamsIn[streamIn] = channel;
+
+ OpenResponseFinish(channel);
+}
+
+void
+DataChannelConnection::OpenResponseFinish(DataChannel *channel)
+{
+ uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM!
+
+ mLock.AssertCurrentThreadOwns();
+
+ LOG(("Finished response: channel %p, streamOut = %u", channel, streamOut));
+
+ if (streamOut == INVALID_STREAM) {
+ if (!RequestMoreStreamsOut()) {
+ /* XXX: Signal error to the other end. */
+ mStreamsIn[channel->mStreamIn] = nullptr;
+ // we can do this with the lock held because mStreamOut is INVALID_STREAM,
+ // so there's no outbound channel to reset
+ delete channel;
+ return;
+ }
+ LOG(("Queuing channel %p to finish response", channel));
+ channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_RSP;
+ mPending.Push(channel);
+ // can't notify the user until we can send an OpenResponse
+ } else {
+ channel->mStreamOut = streamOut;
+ mStreamsOut[streamOut] = channel;
+ if (SendOpenResponseMessage(streamOut, channel->mStreamIn)) {
+ LOG(("successful incoming open of '%s' in: %u, out: %u\n",
+ channel->mLabel.get(), channel->mStreamIn, streamOut));
+
+ /* Notify ondatachannel */
+ // XXX We need to make sure connection sticks around until the message is delivered
+ LOG(("%s: sending ON_CHANNEL_CREATED for %p", __FUNCTION__, channel));
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
+ this, channel));
+ } else {
+ if (errno == EAGAIN) {
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_RSP;
+ StartDefer();
+ } else {
+ /* XXX: Signal error to the other end. */
+ mStreamsIn[channel->mStreamIn] = nullptr;
+ mStreamsOut[streamOut] = nullptr;
+ channel->mStreamOut = INVALID_STREAM;
+ // we can do this with the lock held because mStreamOut is INVALID_STREAM,
+ // so there's no outbound channel to reset (we failed to send on it)
+ delete channel;
+ return; // paranoia against future changes since we unlocked
+ }
+ }
+ }
+}
+
+
+void
+DataChannelConnection::HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp,
+ size_t length, uint16_t streamIn)
+{
+ uint16_t streamOut;
+ DataChannel *channel;
+
+ mLock.AssertCurrentThreadOwns();
+
+ streamOut = ntohs(rsp->reverse_stream);
+ channel = FindChannelByStreamOut(streamOut);
+
+ NS_ENSURE_TRUE(channel != nullptr, /* */);
+ NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */);
+
+ if (rsp->error) {
+ LOG(("%s: error in response to open of channel %d (%s)",
+ __FUNCTION__, streamOut, channel->mLabel.get()));
+
+ } else {
+ NS_ENSURE_TRUE(!FindChannelByStreamIn(streamIn), /* */);
+
+ channel->mStreamIn = streamIn;
+ channel->mState = OPEN;
+ channel->mReady = true;
+ mStreamsIn[streamIn] = channel;
+ if (SendOpenAckMessage(streamOut)) {
+ channel->mFlags = 0;
+ } else {
+ // XXX Only on EAGAIN!? And if not, then close the channel??
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
+ StartDefer();
+ }
+ LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel));
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
+ channel));
+ }
+}
+
+void
+DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
+ size_t length, uint16_t streamIn)
+{
+ DataChannel *channel;
+
+ mLock.AssertCurrentThreadOwns();
+
+ channel = FindChannelByStreamIn(streamIn);
+
+ NS_ENSURE_TRUE(channel != nullptr, /* */);
+ NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */);
+
+ channel->mState = channel->mReady ? DataChannel::OPEN : DataChannel::WAITING_TO_OPEN;
+ if (channel->mState == OPEN) {
+ LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel));
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
+ channel));
+ } else {
+ LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel));
+ }
+}
+
+void
+DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn)
+{
+ /* XXX: Send an error message? */
+ LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, streamIn));
+ // XXX Log to JS error console if possible
+}
+
+void
+DataChannelConnection::HandleDataMessage(uint32_t ppid,
+ const void *data, size_t length,
+ uint16_t streamIn)
+{
+ DataChannel *channel;
+ const char *buffer = (const char *) data;
+
+ mLock.AssertCurrentThreadOwns();
+
+ channel = FindChannelByStreamIn(streamIn);
+
+ // XXX A closed channel may trip this... check
+ NS_ENSURE_TRUE(channel != nullptr, /* */);
+ NS_ENSURE_TRUE(channel->mState != CONNECTING, /* */);
+
+ // XXX should this be a simple if, no warnings/debugbreaks?
+ NS_ENSURE_TRUE(channel->mState != CLOSED, /* */);
+
+ {
+ nsAutoCString recvData(buffer, length);
+
+ switch (ppid) {
+ case DATA_CHANNEL_PPID_DOMSTRING:
+ LOG(("DataChannel: String message received of length %lu on channel %d: %.*s",
+ length, channel->mStreamOut, (int)PR_MIN(length, 80), buffer));
+ length = -1; // Flag for DOMString
+
+ // WebSockets checks IsUTF8() here; we can try to deliver it
+
+ NS_WARN_IF_FALSE(channel->mBinaryBuffer.IsEmpty(), "Binary message aborted by text message!");
+ if (!channel->mBinaryBuffer.IsEmpty())
+ channel->mBinaryBuffer.Truncate(0);
+ break;
+
+ case DATA_CHANNEL_PPID_BINARY:
+ channel->mBinaryBuffer += recvData;
+ LOG(("DataChannel: Received binary message of length %lu (total %u) on channel id %d",
+ length, channel->mBinaryBuffer.Length(), channel->mStreamOut));
+ return; // Not ready to notify application
+
+ case DATA_CHANNEL_PPID_BINARY_LAST:
+ LOG(("DataChannel: Received binary message of length %lu on channel id %d",
+ length, channel->mStreamOut));
+ if (!channel->mBinaryBuffer.IsEmpty()) {
+ channel->mBinaryBuffer += recvData;
+ LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
+ SendOrQueue(channel,
+ new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DATA, this,
+ channel, channel->mBinaryBuffer,
+ channel->mBinaryBuffer.Length()));
+ channel->mBinaryBuffer.Truncate(0);
+ return;
+ }
+ // else send using recvData normally
+ break;
+
+ default:
+ NS_ERROR("Unknown data PPID");
+ return;
+ }
+ /* Notify onmessage */
+ LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
+ SendOrQueue(channel,
+ new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_DATA, this,
+ channel, recvData, length));
+ }
+}
+
+// Called with mLock locked!
+void
+DataChannelConnection::SendOrQueue(DataChannel *aChannel,
+ DataChannelOnMessageAvailable *aMessage)
+{
+ mLock.AssertCurrentThreadOwns();
+
+ if (!aChannel->mReady &&
+ (aChannel->mState == DataChannel::CONNECTING ||
+ aChannel->mState == DataChannel::WAITING_TO_OPEN)) {
+ aChannel->mQueuedMessages.AppendElement(aMessage);
+ } else {
+ NS_DispatchToMainThread(aMessage);
+ }
+}
+
+// Called with mLock locked!
+void
+DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn)
+{
+ const struct rtcweb_datachannel_open_request *req;
+ const struct rtcweb_datachannel_open_response *rsp;
+ const struct rtcweb_datachannel_ack *ack, *msg;
+
+ mLock.AssertCurrentThreadOwns();
+
+ switch (ppid) {
+ case DATA_CHANNEL_PPID_CONTROL:
+ NS_ENSURE_TRUE(length >= sizeof(*ack), /* */); // Ack is the smallest
+
+ msg = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
+ switch (msg->msg_type) {
+ case DATA_CHANNEL_OPEN_REQUEST:
+ LOG(("length %u, sizeof(*req) = %u", length, sizeof(*req)));
+ NS_ENSURE_TRUE(length >= sizeof(*req), /* */);
+
+ req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
+ HandleOpenRequestMessage(req, length, streamIn);
+ break;
+ case DATA_CHANNEL_OPEN_RESPONSE:
+ NS_ENSURE_TRUE(length >= sizeof(*rsp), /* */);
+
+ rsp = static_cast<const struct rtcweb_datachannel_open_response *>(buffer);
+ HandleOpenResponseMessage(rsp, length, streamIn);
+ break;
+ case DATA_CHANNEL_ACK:
+ // >= sizeof(*ack) checked above
+
+ ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
+ HandleOpenAckMessage(ack, length, streamIn);
+ break;
+ default:
+ HandleUnknownMessage(ppid, length, streamIn);
+ break;
+ }
+ break;
+ case DATA_CHANNEL_PPID_DOMSTRING:
+ case DATA_CHANNEL_PPID_BINARY:
+ case DATA_CHANNEL_PPID_BINARY_LAST:
+ HandleDataMessage(ppid, buffer, length, streamIn);
+ break;
+ default:
+ LOG(("Message of length %lu, PPID %u on stream %u received.",
+ length, ppid, streamIn));
+ break;
+ }
+}
+
+void
+DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
+{
+ uint32_t i, n;
+
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ LOG(("Association change: SCTP_COMM_UP"));
+ break;
+ case SCTP_COMM_LOST:
+ LOG(("Association change: SCTP_COMM_LOST"));
+ break;
+ case SCTP_RESTART:
+ LOG(("Association change: SCTP_RESTART"));
+ break;
+ case SCTP_SHUTDOWN_COMP:
+ LOG(("Association change: SCTP_SHUTDOWN_COMP"));
+ break;
+ case SCTP_CANT_STR_ASSOC:
+ LOG(("Association change: SCTP_CANT_STR_ASSOC"));
+ break;
+ default:
+ LOG(("Association change: UNKNOWN"));
+ break;
+ }
+ LOG(("Association change: streams (in/out) = (%u/%u)",
+ sac->sac_inbound_streams, sac->sac_outbound_streams));
+
+ NS_ENSURE_TRUE(sizeof(*sac) >= sac->sac_length, /* */);
+ n = sac->sac_length - sizeof(*sac);
+ if (((sac->sac_state == SCTP_COMM_UP) ||
+ (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
+ for (i = 0; i < n; ++i) {
+ switch (sac->sac_info[i]) {
+ case SCTP_ASSOC_SUPPORTS_PR:
+ LOG(("Supports: PR"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_AUTH:
+ LOG(("Supports: AUTH"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_ASCONF:
+ LOG(("Supports: ASCONF"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_MULTIBUF:
+ LOG(("Supports: MULTIBUF"));
+ break;
+ case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
+ LOG(("Supports: RE-CONFIG"));
+ break;
+ default:
+ LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
+ break;
+ }
+ }
+ } else if (((sac->sac_state == SCTP_COMM_LOST) ||
+ (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
+ LOG(("Association: ABORT ="));
+ for (i = 0; i < n; ++i) {
+ LOG((" 0x%02x", sac->sac_info[i]));
+ }
+ }
+ if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
+ (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
+ (sac->sac_state == SCTP_COMM_LOST)) {
+ return;
+ }
+}
+
+void
+DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
+{
+ char addr_buf[INET6_ADDRSTRLEN];
+ const char *addr = "";
+ struct sockaddr_in *sin;
+ struct sockaddr_in6 *sin6;
+#if defined(__Userspace_os_Windows)
+ DWORD addr_len = INET6_ADDRSTRLEN;
+#endif
+
+ switch (spc->spc_aaddr.ss_family) {
+ case AF_INET:
+ sin = (struct sockaddr_in *)&spc->spc_aaddr;
+#if !defined(__Userspace_os_Windows)
+ addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
+#else
+ if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr,
+ addr_buf, &addr_len)) {
+ return;
+ }
+#endif
+ break;
+ case AF_INET6:
+ sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
+#if !defined(__Userspace_os_Windows)
+ addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
+#else
+ if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr,
+ addr_buf, &addr_len)) {
+ return;
+ }
+#endif
+ case AF_CONN:
+ addr = "DTLS connection";
+ break;
+ default:
+ break;
+ }
+ LOG(("Peer address %s is now ", addr));
+ switch (spc->spc_state) {
+ case SCTP_ADDR_AVAILABLE:
+ LOG(("SCTP_ADDR_AVAILABLE"));
+ break;
+ case SCTP_ADDR_UNREACHABLE:
+ LOG(("SCTP_ADDR_UNREACHABLE"));
+ break;
+ case SCTP_ADDR_REMOVED:
+ LOG(("SCTP_ADDR_REMOVED"));
+ break;
+ case SCTP_ADDR_ADDED:
+ LOG(("SCTP_ADDR_ADDED"));
+ break;
+ case SCTP_ADDR_MADE_PRIM:
+ LOG(("SCTP_ADDR_MADE_PRIM"));
+ break;
+ case SCTP_ADDR_CONFIRMED:
+ LOG(("SCTP_ADDR_CONFIRMED"));
+ break;
+ default:
+ LOG(("UNKNOWN"));
+ break;
+ }
+ LOG((" (error = 0x%08x).\n", spc->spc_error));
+}
+
+void
+DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
+{
+ size_t i, n;
+
+ n = sre->sre_length - sizeof(struct sctp_remote_error);
+ LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
+ for (i = 0; i < n; ++i) {
+ LOG((" 0x%02x", sre-> sre_data[i]));
+ }
+}
+
+void
+DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
+{
+ LOG(("Shutdown event."));
+ /* XXX: notify all channels. */
+ // Attempts to actually send anything will fail
+}
+
+void
+DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
+{
+ LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
+}
+
+void
+DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
+{
+ size_t i, n;
+
+ if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
+ LOG(("Unsent "));
+ }
+ if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
+ LOG(("Sent "));
+ }
+ if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
+ LOG(("(flags = %x) ", ssfe->ssfe_flags));
+ }
+ LOG(("message with PPID = %d, SID = %d, flags: 0x%04x due to error = 0x%08x",
+ ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
+ ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
+ n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
+ for (i = 0; i < n; ++i) {
+ LOG((" 0x%02x", ssfe->ssfe_data[i]));
+ }
+}
+
+void
+DataChannelConnection::ResetOutgoingStream(uint16_t streamOut)
+{
+ uint32_t i;
+
+ mLock.AssertCurrentThreadOwns();
+ // Rarely has more than a couple items and only for a short time
+ for (i = 0; i < mStreamsResetting.Length(); ++i) {
+ if (mStreamsResetting[i] == streamOut) {
+ return;
+ }
+ }
+ mStreamsResetting.AppendElement(streamOut);
+}
+
+void
+DataChannelConnection::SendOutgoingStreamReset()
+{
+ struct sctp_reset_streams *srs;
+ uint32_t i;
+ size_t len;
+
+ mLock.AssertCurrentThreadOwns();
+ if (mStreamsResetting.IsEmpty() == 0) {
+ return;
+ }
+ len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
+ srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
+ memset(srs, 0, len);
+ srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
+ srs->srs_number_streams = mStreamsResetting.Length();
+ for (i = 0; i < mStreamsResetting.Length(); ++i) {
+ srs->srs_stream_list[i] = mStreamsResetting[i];
+ }
+ if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
+ LOG(("***failed: setsockopt RESET, errno %d", errno));
+ } else {
+ mStreamsResetting.Clear();
+ }
+ moz_free(srs);
+}
+
+void
+DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
+{
+ uint32_t n, i;
+ DataChannel *channel;
+
+ if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
+ !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
+ n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
+ for (i = 0; i < n; ++i) {
+ if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
+ channel = FindChannelByStreamIn(strrst->strreset_stream_list[i]);
+ if (channel != nullptr) {
+ mStreamsIn[channel->mStreamIn] = nullptr;
+ channel->mStreamIn = INVALID_STREAM;
+ if (channel->mStreamOut == INVALID_STREAM) {
+ channel->mPrPolicy = SCTP_PR_SCTP_NONE;
+ channel->mPrValue = 0;
+ channel->mFlags = 0;
+ channel->mState = CLOSED;
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel));
+ } else {
+ ResetOutgoingStream(channel->mStreamOut);
+ channel->mState = CLOSING;
+ }
+ }
+ }
+ if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
+ channel = FindChannelByStreamOut(strrst->strreset_stream_list[i]);
+ if (channel != nullptr && channel->mStreamOut != INVALID_STREAM) {
+ mStreamsOut[channel->mStreamOut] = nullptr;
+ channel->mStreamOut = INVALID_STREAM;
+ if (channel->mStreamIn == INVALID_STREAM) {
+ channel->mPrPolicy = SCTP_PR_SCTP_NONE;
+ channel->mPrValue = 0;
+ channel->mFlags = 0;
+ channel->mState = CLOSED;
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
+ channel));
+ }
+ }
+ }
+ }
+ }
+}
+
+void
+DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
+{
+ uint16_t streamOut;
+ uint32_t i;
+ DataChannel *channel;
+
+ if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
+ LOG(("*** Failed increasing number of streams from %u (%u/%u)",
+ mStreamsOut.Length(),
+ strchg->strchange_instrms,
+ strchg->strchange_outstrms));
+ // XXX FIX! notify pending opens of failure
+ return;
+ } else {
+ if (strchg->strchange_instrms > mStreamsIn.Length()) {
+ LOG(("Other side increased streamds from %u to %u",
+ mStreamsIn.Length(), strchg->strchange_instrms));
+ }
+ if (strchg->strchange_outstrms > mStreamsOut.Length()) {
+ uint16_t old_len = mStreamsOut.Length();
+ LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
+ old_len,
+ strchg->strchange_outstrms,
+ strchg->strchange_outstrms - old_len,
+ strchg->strchange_instrms));
+ // make sure both are the same length
+ mStreamsOut.AppendElements(strchg->strchange_outstrms - old_len);
+ LOG(("New length = %d (was %d)", mStreamsOut.Length(), old_len));
+ for (uint32_t i = old_len; i < mStreamsOut.Length(); ++i) {
+ mStreamsOut[i] = nullptr;
+ }
+ // Re-process any channels waiting for streams.
+ // Linear search, but we don't increase channels often and
+ // the array would only get long in case of an app error normally
+
+ // Make sure we request enough streams if there's a big jump in streams
+ // Could make a more complex API for OpenXxxFinish() and avoid this loop
+ int32_t num_needed = mPending.GetSize();
+ LOG(("%d of %d new streams already needed", num_needed,
+ strchg->strchange_outstrms - old_len));
+ num_needed -= (strchg->strchange_outstrms - old_len); // number we added
+ if (num_needed > 0) {
+ if (num_needed < 16)
+ num_needed = 16;
+ LOG(("Not enough new streams, asking for %d more", num_needed));
+ RequestMoreStreamsOut(num_needed);
+ }
+
+ // Can't copy nsDeque's. Move into temp array since any that fail will
+ // go back to mPending
+ nsDeque temp;
+ while (nullptr != (channel = static_cast<DataChannel *>(mPending.PopFront()))) {
+ temp.Push(channel);
+ }
+
+ // Now assign our new streams
+ while (nullptr != (channel = static_cast<DataChannel *>(temp.PopFront()))) {
+ if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_RSP;
+ OpenResponseFinish(channel); // may reset the flag and re-push
+ } else if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
+ channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
+ OpenFinish(channel); // may reset the flag and re-push
+ }
+ }
+ }
+ // else probably not a change in # of streams
+ }
+
+ for (i = 0; i < mStreamsOut.Length(); ++i) {
+ channel = mStreamsOut[i];
+ if (!channel)
+ continue;
+
+ if ((channel->mState == CONNECTING) &&
+ (channel->mStreamOut == INVALID_STREAM)) {
+ if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
+ (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
+ /* XXX: Signal to the other end. */
+ if (channel->mStreamIn != INVALID_STREAM) {
+ mStreamsIn[channel->mStreamIn] = nullptr;
+ }
+ channel->mState = CLOSED;
+ // inform user!
+ // XXX delete channel;
+ } else {
+ streamOut = FindFreeStreamOut();
+ if (streamOut != INVALID_STREAM) {
+ channel->mStreamOut = streamOut;
+ mStreamsOut[streamOut] = channel;
+ if (channel->mStreamIn == INVALID_STREAM) {
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
+ } else {
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_RSP;
+ }
+ StartDefer();
+ } else {
+ /* We will not find more ... */
+ break;
+ }
+ }
+ }
+ }
+}
+
+
+// Called with mLock locked!
+void
+DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
+{
+ mLock.AssertCurrentThreadOwns();
+ if (notif->sn_header.sn_length != (uint32_t)n) {
+ return;
+ }
+ switch (notif->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE:
+ HandleAssociationChangeEvent(&(notif->sn_assoc_change));
+ break;
+ case SCTP_PEER_ADDR_CHANGE:
+ HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
+ break;
+ case SCTP_REMOTE_ERROR:
+ HandleRemoteErrorEvent(&(notif->sn_remote_error));
+ break;
+ case SCTP_SHUTDOWN_EVENT:
+ HandleShutdownEvent(&(notif->sn_shutdown_event));
+ break;
+ case SCTP_ADAPTATION_INDICATION:
+ HandleAdaptationIndication(&(notif->sn_adaptation_event));
+ break;
+ case SCTP_PARTIAL_DELIVERY_EVENT:
+ LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
+ break;
+ case SCTP_AUTHENTICATION_EVENT:
+ LOG(("SCTP_AUTHENTICATION_EVENT"));
+ break;
+ case SCTP_SENDER_DRY_EVENT:
+ //LOG(("SCTP_SENDER_DRY_EVENT"));
+ break;
+ case SCTP_NOTIFICATIONS_STOPPED_EVENT:
+ LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
+ break;
+ case SCTP_SEND_FAILED_EVENT:
+ HandleSendFailedEvent(&(notif->sn_send_failed_event));
+ break;
+ case SCTP_STREAM_RESET_EVENT:
+ HandleStreamResetEvent(&(notif->sn_strreset_event));
+ break;
+ case SCTP_ASSOC_RESET_EVENT:
+ LOG(("SCTP_ASSOC_RESET_EVENT"));
+ break;
+ case SCTP_STREAM_CHANGE_EVENT:
+ HandleStreamChangeEvent(&(notif->sn_strchange_event));
+ break;
+ default:
+ LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
+ break;
+ }
+ }
+
+int
+DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
+ struct sctp_rcvinfo rcv, int32_t flags)
+{
+ MOZ_ASSERT(!NS_IsMainThread());
+
+ if (!data) {
+ usrsctp_close(sock); // SCTP has finished shutting down
+ } else {
+ MutexAutoLock lock(mLock);
+ if (flags & MSG_NOTIFICATION) {
+ HandleNotification(static_cast<union sctp_notification *>(data), datalen);
+ } else {
+ HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
+ }
+ }
+ // usrsctp defines the callback as returning an int, but doesn't use it
+ return 1;
+}
+
+DataChannel *
+DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder,
+ uint32_t prValue, DataChannelListener *aListener,
+ nsISupports *aContext)
+{
+ DataChannel *channel;
+ uint16_t prPolicy = SCTP_PR_SCTP_NONE;
+ uint32_t flags;
+
+ LOG(("DC Open: label %s, type %u, inorder %d, prValue %u, listener %p, context %p",
+ PromiseFlatCString(label).get(), type, inOrder, prValue, aListener, aContext));
+ switch (type) {
+ case DATA_CHANNEL_RELIABLE:
+ prPolicy = SCTP_PR_SCTP_NONE;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
+ prPolicy = SCTP_PR_SCTP_RTX;
+ break;
+ case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
+ prPolicy = SCTP_PR_SCTP_TTL;
+ break;
+ }
+ if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
+ return nullptr;
+ }
+
+ flags = !inOrder ? DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED : 0;
+ channel = new DataChannel(this, INVALID_STREAM, INVALID_STREAM,
+ DataChannel::CONNECTING,
+ label, type, prValue,
+ flags,
+ aListener, aContext); // infallible malloc
+
+ MutexAutoLock lock(mLock); // OpenFinish assumes this
+ return OpenFinish(channel);
+}
+
+// Separate routine so we can also call it to finish up from pending opens
+DataChannel *
+DataChannelConnection::OpenFinish(DataChannel *channel)
+{
+ uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM!
+
+ mLock.AssertCurrentThreadOwns();
+
+ LOG(("Finishing open: channel %p, streamOut = %u", channel, streamOut));
+
+ if (streamOut == INVALID_STREAM) {
+ if (!RequestMoreStreamsOut()) {
+ if (channel->mFlags &= DATA_CHANNEL_FLAGS_FINISH_OPEN) {
+ // We already returned the channel to the app. Mark it closed
+ channel->mState = CLOSED;
+ NS_ERROR("Failed to request more streams");
+ return channel;
+ }
+ // we can do this with the lock held because mStreamOut is INVALID_STREAM,
+ // so there's no outbound channel to reset
+ delete channel;
+ return nullptr;
+ }
+ LOG(("Queuing channel %p to finish open", channel));
+ // Also serves to mark we told the app
+ channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
+ mPending.Push(channel);
+ return channel;
+ }
+ mStreamsOut[streamOut] = channel;
+ channel->mStreamOut = streamOut;
+
+ if (!SendOpenRequestMessage(channel->mLabel, streamOut,
+ !!(channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED),
+ channel->mPrPolicy, channel->mPrValue)) {
+ LOG(("SendOpenRequest failed, errno = %d", errno));
+ if (errno == EAGAIN) {
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
+ StartDefer();
+ } else {
+ // XXX FIX! can't do this if we previously returned it! Need to internally mark it dead
+ // and file onerror
+ mStreamsOut[streamOut] = nullptr;
+ channel->mStreamOut = INVALID_STREAM;
+ // we can do this with the lock held because mStreamOut is INVALID_STREAM,
+ // so there's no outbound channel to reset (we didn't sent anything)
+ delete channel;
+ return nullptr;
+ }
+ }
+ return channel;
+}
+
+int32_t
+DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
+ uint32_t length, uint32_t ppid)
+{
+ uint16_t flags;
+ struct sctp_sendv_spa spa;
+ int32_t result;
+
+ NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
+ NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
+
+ flags = (channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED) ? SCTP_UNORDERED : 0;
+
+ // To avoid problems where an in-order OPEN_RESPONSE is lost and an
+ // out-of-order data message "beats" it, require data to be in-order
+ // until we get an ACK.
+ if (channel->mState == CONNECTING) {
+ flags &= ~SCTP_UNORDERED;
+ }
+ spa.sendv_sndinfo.snd_ppid = htonl(ppid);
+ spa.sendv_sndinfo.snd_sid = channel->mStreamOut;
+ spa.sendv_sndinfo.snd_flags = flags;
+ spa.sendv_sndinfo.snd_context = 0;
+ spa.sendv_sndinfo.snd_assoc_id = 0;
+
+ spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
+ spa.sendv_prinfo.pr_value = channel->mPrValue;
+
+ spa.sendv_flags = SCTP_SEND_SNDINFO_VALID | SCTP_SEND_PRINFO_VALID;
+
+ // Note: Main-thread IO, but doesn't block!
+ // XXX FIX! to deal with heavy overruns of JS trying to pass data in
+ // (more than the buffersize) queue data onto another thread to do the
+ // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
+
+ // SCTP will return EMSGSIZE if the message is bigger than the buffer
+ // size (or EAGAIN if there isn't space)
+ if (channel->mBufferedData.IsEmpty()) {
+ result = usrsctp_sendv(mSocket, data, length,
+ nullptr, 0,
+ (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
+ SCTP_SENDV_SPA, flags);
+ LOG(("Sent buffer (len=%u), result=%d", length, result));
+ } else {
+ // Fake EAGAIN if we're already buffering data
+ result = -1;
+ errno = EAGAIN;
+ }
+ if (result < 0) {
+ if (errno == EAGAIN) {
+ // queue data for resend! And queue any further data for the stream until it is...
+ BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc
+ channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
+ channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
+ LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
+ StartDefer();
+ return 0;
+ }
+ LOG(("error %d sending string", errno));
+ }
+ return result;
+}
+
+// Handles fragmenting binary messages
+int32_t
+DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
+ uint32_t len)
+{
+ // Since there's a limit on network buffer size and no limits on message
+ // size, and we don't want to use EOR mode (multiple writes for a
+ // message, but all other streams are blocked until you finish sending
+ // this message), we need to add application-level fragmentation of large
+ // messages. On a reliable channel, these can be simply rebuilt into a
+ // large message. On an unreliable channel, we can't and don't know how
+ // long to wait, and there are no retransmissions, and no easy way to
+ // tell the user "this part is missing", so on unreliable channels we
+ // need to return an error if sending more bytes than the network buffers
+ // can hold, and perhaps a lower number.
+
+ // We *really* don't want to do this from main thread! - and SendMsgInternal
+ // avoids blocking.
+ if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
+ channel->mPrPolicy == DATA_CHANNEL_RELIABLE) {
+ int32_t sent=0;
+ uint32_t origlen = len;
+ LOG(("Sending binary message length %u in chunks", len));
+ // XXX check flags for out-of-order, or force in-order for large binary messages
+ while (len > 0) {
+ uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
+ uint32_t ppid;
+ len -= sendlen;
+ ppid = len > 0 ? DATA_CHANNEL_PPID_BINARY : DATA_CHANNEL_PPID_BINARY_LAST;
+ LOG(("Send chunk of %d bytes, ppid %d", sendlen, ppid));
+ // Note that these might end up being deferred and queued.
+ sent += SendMsgInternal(channel, data, sendlen, ppid);
+ data += sendlen;
+ }
+ LOG(("Sent %d buffers for %u bytes, %d sent immediately, % buffers queued",
+ (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
+ origlen, sent,
+ channel->mBufferedData.Length()));
+ return sent;
+ }
+ NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
+ "Sending too-large data on unreliable channel!");
+
+ // This will fail if the message is too large
+ return SendMsgInternal(channel, data, len, DATA_CHANNEL_PPID_BINARY_LAST);
+}
+
+int32_t
+DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
+{
+ DataChannel *channel = mStreamsOut[stream];
+ NS_ENSURE_TRUE(channel, 0);
+ // Spawn a thread to send the data
+
+ LOG(("Sending blob to stream %u", stream));
+
+ // XXX to do this safely, we must enqueue these atomically onto the
+ // output socket. We need a sender thread(s?) to enque data into the
+ // socket and to avoid main-thread IO that might block. Even on a
+ // background thread, we may not want to block on one stream's data.
+ // I.e. run non-blocking and service multiple channels.
+
+ // For now as a hack, send as a single blast of queued packets which may
+ // be deferred until buffer space is available.
+ nsAutoPtr<nsCString> temp(new nsCString());
+ uint64_t len;
+ aBlob->Available(&len);
+ nsresult rv = NS_ReadInputStreamToString(aBlob, *temp, len);
+
+ NS_ENSURE_SUCCESS(rv, rv);
+
+ aBlob->Close();
+ //aBlob->Release(); We didn't AddRef() the way WebSocket does in OutboundMessage (yet)
+
+ // Consider if it makes sense to split the message ourselves for
+ // transmission, at least on RELIABLE channels. Sending large blobs via
+ // unreliable channels requires some level of application involvement, OR
+ // sending them at big, single messages, which if large will probably not
+ // get through.
+
+ // XXX For now, send as one large binary message. We should also signal
+ // (via PPID) that it's a blob.
+ const char *data = temp.get()->BeginReading();
+ len = temp.get()->Length();
+
+ return SendBinary(channel, data, len);
+}
+
+int32_t
+DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
+ bool isBinary)
+{
+ MOZ_ASSERT(NS_IsMainThread());
+ // We really could allow this from other threads, so long as we deal with
+ // asynchronosity issues with channels closing, in particular access to
+ // mStreamsOut, and issues with the association closing (access to mSocket).
+
+ const char *data = aMsg.BeginReading();
+ uint32_t len = aMsg.Length();
+ DataChannel *channel;
+
+ if (isBinary)
+ LOG(("Sending to stream %u: %u bytes", stream, len));
+ else
+ LOG(("Sending to stream %u: %s", stream, data));
+ // XXX if we want more efficiency, translate flags once at open time
+ channel = mStreamsOut[stream];
+ NS_ENSURE_TRUE(channel, 0);
+
+ if (isBinary)
+ return SendBinary(channel, data, len);
+ return SendMsgInternal(channel, data, len, DATA_CHANNEL_PPID_DOMSTRING);
+}
+
+void
+DataChannelConnection::Close(uint16_t streamOut)
+{
+ DataChannel *channel;
+
+ MutexAutoLock lock(mLock);
+ LOG(("Closing stream %d",streamOut));
+ channel = FindChannelByStreamOut(streamOut);
+ if (channel) {
+ channel->mBufferedData.Clear();
+ ResetOutgoingStream(channel->mStreamOut);
+ SendOutgoingStreamReset();
+ channel->mState = CLOSING;
+ }
+}
+
+void DataChannelConnection::CloseAll()
+{
+ LOG(("Closing all channels"));
+ // Don't need to lock here
+
+ // Make sure no more channels will be opened
+ mState = CLOSED;
+
+ // Close current channels
+ // FIX! if there are runnables, they must use weakrefs or hold a strong
+ // ref and keep the channel and/or connection alive
+ for (uint32_t i = 0; i < mStreamsOut.Length(); ++i) {
+ if (mStreamsOut[i]) {
+ mStreamsOut[i]->Close();
+ }
+ }
+
+ // Clean up any pending opens for channels
+ DataChannel *channel;
+ while (nullptr != (channel = static_cast<DataChannel *>(mPending.PopFront())))
+ channel->Close();
+}
+
+void
+DataChannel::Close()
+{
+ if (mState == CLOSING || mState == CLOSED ||
+ mStreamOut == INVALID_STREAM) {
+ return;
+ }
+ mState = CLOSING;
+ mConnection->Close(mStreamOut);
+ mStreamOut = INVALID_STREAM;
+ mStreamIn = INVALID_STREAM;
+}
+
+void
+DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
+{
+ MOZ_ASSERT(!mListener); // only should be set once, avoids races w/o locking
+ mContext = aContext;
+ mListener = aListener;
+}
+
+// May be called from another (i.e. Main) thread!
+void
+DataChannel::AppReady()
+{
+ MutexAutoLock lock(mConnection->mLock);
+
+ mReady = true;
+ if (mState == WAITING_TO_OPEN) {
+ mState = OPEN;
+ NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
+ DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
+ this));
+ for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
+ nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
+ MOZ_ASSERT(runnable);
+ NS_DispatchToMainThread(runnable);
+ }
+ } else {
+ NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
+ }
+ mQueuedMessages.Clear();
+ mQueuedMessages.Compact();
+ // We never use it again... We could even allocate the array in the odd
+ // cases we need it.
+}
+
+uint32_t
+DataChannel::GetBufferedAmount()
+{
+ uint32_t buffered = 0;
+ for (uint32_t i = 0; i < mBufferedData.Length(); ++i) {
+ buffered += mBufferedData[i]->mLength;
+ }
+ return buffered;
+}
+
+} // namespace mozilla
+
new file mode 100644
--- /dev/null
+++ b/netwerk/sctp/datachannel/DataChannel.h
@@ -0,0 +1,456 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
+#define NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
+
+#include <string>
+#include "nsISupports.h"
+#include "nsCOMPtr.h"
+#include "nsString.h"
+#include "nsThreadUtils.h"
+#include "nsTArray.h"
+#include "nsDeque.h"
+#include "nsIInputStream.h"
+#include "nsITimer.h"
+#include "mozilla/Mutex.h"
+#include "DataChannelProtocol.h"
+#ifdef SCTP_DTLS_SUPPORTED
+#include "talk/base/sigslot.h"
+#include "mtransport/transportflow.h"
+#include "mtransport/transportlayer.h"
+#include "mtransport/transportlayerprsock.h"
+#endif
+
+extern "C" {
+ struct socket;
+ struct sctp_rcvinfo;
+}
+
+namespace mozilla {
+
+class DTLSConnection;
+class DataChannelConnection;
+class DataChannel;
+class DataChannelOnMessageAvailable;
+
+class BufferedMsg
+{
+public:
+ BufferedMsg(struct sctp_sendv_spa &spa,const char *data,
+ uint32_t length);
+ ~BufferedMsg();
+
+ struct sctp_sendv_spa *mSpa;
+ const char *mData;
+ uint32_t mLength;
+};
+
+// Implemented by consumers of a Channel to receive messages.
+// Can't nest it in DataChannelConnection because C++ doesn't allow forward
+// refs to embedded classes
+class DataChannelListener {
+public:
+ virtual ~DataChannelListener() {}
+
+ // Called when a DOMString message is received.
+ virtual nsresult OnMessageAvailable(nsISupports *aContext,
+ const nsACString& message) = 0;
+
+ // Called when a binary message is received.
+ virtual nsresult OnBinaryMessageAvailable(nsISupports *aContext,
+ const nsACString& message) = 0;
+
+ // Called when the channel is connected
+ virtual nsresult OnChannelConnected(nsISupports *aContext) = 0;
+
+ // Called when the channel is closed
+ virtual nsresult OnChannelClosed(nsISupports *aContext) = 0;
+};
+
+
+// One per PeerConnection
+class DataChannelConnection: public nsITimerCallback
+#ifdef SCTP_DTLS_SUPPORTED
+ , public sigslot::has_slots<>
+#endif
+{
+public:
+ NS_DECL_ISUPPORTS
+ NS_DECL_NSITIMERCALLBACK
+
+ class DataConnectionListener {
+ public:
+ virtual ~DataConnectionListener() {}
+
+ // Called when a the connection is open
+ virtual void NotifyConnection() = 0;
+
+ // Called when a the connection is lost/closed
+ virtual void NotifyClosedConnection() = 0;
+
+ // Called when a new DataChannel has been opened by the other side.
+ virtual void NotifyDataChannel(DataChannel *channel) = 0;
+ };
+
+ DataChannelConnection(DataConnectionListener *listener);
+ virtual ~DataChannelConnection();
+
+ bool Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls);
+
+ // These block; they require something to decide on listener/connector
+ // (though you can do simultaneous Connect()). Do not call these from
+ // the main thread!
+ bool Listen(unsigned short port);
+ bool Connect(const char *addr, unsigned short port);
+
+#ifdef SCTP_DTLS_SUPPORTED
+ // Connect using a TransportFlow (DTLS) channel
+ bool ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport);
+#endif
+
+ typedef enum {
+ RELIABLE=0,
+ PARTIAL_RELIABLE_REXMIT = 1,
+ PARTIAL_RELIABLE_TIMED = 2
+ } Type;
+
+ DataChannel *Open(const nsACString& label,
+ Type type, bool inOrder,
+ uint32_t prValue, DataChannelListener *aListener,
+ nsISupports *aContext);
+
+ void Close(uint16_t stream);
+ void CloseAll();
+
+ int32_t SendMsg(uint16_t stream, const nsACString &aMsg)
+ {
+ return SendMsgCommon(stream, aMsg, false);
+ }
+ int32_t SendBinaryMsg(uint16_t stream, const nsACString &aMsg)
+ {
+ return SendMsgCommon(stream, aMsg, true);
+ }
+ int32_t SendBlob(uint16_t stream, nsIInputStream *aBlob);
+
+ // Called on data reception from the SCTP library
+ // must(?) be public so my c->c++ tramploine can call it
+ int ReceiveCallback(struct socket* sock, void *data, size_t datalen,
+ struct sctp_rcvinfo rcv, int32_t flags);
+
+ // Find out state
+ enum {
+ CONNECTING = 0U,
+ OPEN = 1U,
+ CLOSING = 2U,
+ CLOSED = 3U
+ };
+ uint16_t GetReadyState() { return mState; }
+
+ friend class DataChannel;
+ Mutex mLock;
+
+protected:
+ friend class DataChannelOnMessageAvailable;
+ DataConnectionListener *mListener;
+
+private:
+#ifdef SCTP_DTLS_SUPPORTED
+ static void DTLSConnectThread(void *data);
+ int SendPacket(const unsigned char* data, size_t len);
+ void PacketReceived(TransportFlow *flow, const unsigned char *data, size_t len);
+ static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
+#endif
+ DataChannel* FindChannelByStreamIn(uint16_t streamIn);
+ DataChannel* FindChannelByStreamOut(uint16_t streamOut);
+ uint16_t FindFreeStreamOut();
+ bool RequestMoreStreamsOut(int32_t aNeeded = 16);
+ int32_t SendControlMessage(void *msg, uint32_t len, uint16_t streamOut);
+ int32_t SendOpenRequestMessage(const nsACString& label,uint16_t streamOut,
+ bool unordered, uint16_t prPolicy, uint32_t prValue);
+ int32_t SendOpenResponseMessage(uint16_t streamOut, uint16_t streamIn);
+ int32_t SendOpenAckMessage(uint16_t streamOut);
+ int32_t SendMsgInternal(DataChannel *channel, const char *data,
+ uint32_t length, uint32_t ppid);
+ int32_t SendBinary(DataChannel *channel, const char *data,
+ uint32_t len);
+ int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
+
+ DataChannel *OpenFinish(DataChannel *channel);
+
+ void SendOrQueue(DataChannel *aChannel, DataChannelOnMessageAvailable *aMessage);
+ void StartDefer();
+ bool SendDeferredMessages();
+ void SendOutgoingStreamReset();
+ void ResetOutgoingStream(uint16_t streamOut);
+ void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
+ size_t length,
+ uint16_t streamIn);
+ void OpenResponseFinish(DataChannel *channel);
+ void HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp,
+ size_t length, uint16_t streamIn);
+ void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
+ size_t length, uint16_t streamIn);
+ void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn);
+ void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t streamIn);
+ void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn);
+ void HandleAssociationChangeEvent(const struct sctp_assoc_change *sac);
+ void HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc);
+ void HandleRemoteErrorEvent(const struct sctp_remote_error *sre);
+ void HandleShutdownEvent(const struct sctp_shutdown_event *sse);
+ void HandleAdaptationIndication(const struct sctp_adaptation_event *sai);
+ void HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe);
+ void HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst);
+ void HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg);
+ void HandleNotification(const union sctp_notification *notif, size_t n);
+
+ // NOTE: while these arrays will auto-expand, increases in the number of
+ // channels available from the stack must be negotiated!
+ nsAutoTArray<DataChannel*,16> mStreamsOut;
+ nsAutoTArray<DataChannel*,16> mStreamsIn;
+ nsDeque mPending; // Holds DataChannels
+
+ // Streams pending reset
+ nsAutoTArray<uint16_t,4> mStreamsResetting;
+
+ struct socket *mMasterSocket;
+ struct socket *mSocket;
+ uint16_t mState;
+
+#ifdef SCTP_DTLS_SUPPORTED
+ nsRefPtr<TransportFlow> mTransportFlow;
+#endif
+ uint16_t mLocalPort;
+ uint16_t mRemotePort;
+
+ // Timer to control when we try to resend blocked messages
+ nsCOMPtr<nsITimer> mDeferredTimer;
+ uint32_t mDeferTimeout; // in ms
+ bool mTimerRunning;
+};
+
+class DataChannel {
+public:
+ enum {
+ CONNECTING = 0U,
+ OPEN = 1U,
+ CLOSING = 2U,
+ CLOSED = 3U,
+ WAITING_TO_OPEN = 4U
+ };
+
+ DataChannel(DataChannelConnection *connection,
+ uint16_t streamOut, uint16_t streamIn,
+ uint16_t state,
+ const nsACString& label,
+ uint16_t policy, uint32_t value,
+ uint32_t flags,
+ DataChannelListener *aListener,
+ nsISupports *aContext)
+ : mListener(aListener)
+ , mConnection(connection)
+ , mLabel(label)
+ , mState(state)
+ , mReady(false)
+ , mStreamOut(streamOut)
+ , mStreamIn(streamIn)
+ , mPrPolicy(policy)
+ , mPrValue(value)
+ , mFlags(0)
+ , mContext(aContext)
+ {
+ NS_ASSERTION(mConnection,"NULL connection");
+ }
+
+ ~DataChannel()
+ {
+ Close();
+ }
+
+ // Close this DataChannel. Can be called multiple times.
+ void Close();
+
+ // Set the listener (especially for channels created from the other side)
+ // Note: The Listener and Context should only be set once
+ void SetListener(DataChannelListener *aListener, nsISupports *aContext);
+
+ // Send a string
+ bool SendMsg(const nsACString &aMsg)
+ {
+ if (mStreamOut != INVALID_STREAM)
+ return (mConnection->SendMsg(mStreamOut, aMsg) > 0);
+ else
+ return false;
+ }
+
+ // Send a binary message (TypedArray)
+ bool SendBinaryMsg(const nsACString &aMsg)
+ {
+ if (mStreamOut != INVALID_STREAM)
+ return (mConnection->SendBinaryMsg(mStreamOut, aMsg) > 0);
+ else
+ return false;
+ }
+
+ // Send a binary blob
+ bool SendBinaryStream(nsIInputStream *aBlob, uint32_t msgLen)
+ {
+ if (mStreamOut != INVALID_STREAM)
+ return (mConnection->SendBlob(mStreamOut, aBlob) > 0);
+ else
+ return false;
+ }
+
+ uint16_t GetType() { return mPrPolicy; }
+
+ bool GetOrdered() { return !(mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED); }
+
+ // Amount of data buffered to send
+ uint32_t GetBufferedAmount();
+
+ // Find out state
+ uint16_t GetReadyState()
+ {
+ if (mState == WAITING_TO_OPEN)
+ return CONNECTING;
+ return mState;
+ }
+
+ void SetReadyState(uint16_t aState) { mState = aState; }
+
+ void GetLabel(nsAString& aLabel) { CopyUTF8toUTF16(mLabel, aLabel); }
+
+ void AppReady();
+
+protected:
+ DataChannelListener *mListener;
+
+private:
+ friend class DataChannelOnMessageAvailable;
+ friend class DataChannelConnection;
+
+ nsresult AddDataToBinaryMsg(const char *data, uint32_t size);
+
+ nsRefPtr<DataChannelConnection> mConnection;
+ nsCString mLabel;
+ uint16_t mState;
+ bool mReady;
+ uint16_t mStreamOut;
+ uint16_t mStreamIn;
+ uint16_t mPrPolicy;
+ uint32_t mPrValue;
+ uint32_t mFlags;
+ uint32_t mId;
+ nsCOMPtr<nsISupports> mContext;
+ nsCString mBinaryBuffer;
+ nsTArray<nsAutoPtr<BufferedMsg> > mBufferedData;
+ nsTArray<nsCOMPtr<nsIRunnable> > mQueuedMessages;
+};
+
+// used to dispatch notifications of incoming data to the main thread
+// Patterned on CallOnMessageAvailable in WebSockets
+// Also used to proxy other items to MainThread
+class DataChannelOnMessageAvailable : public nsRunnable
+{
+public:
+ enum {
+ ON_CONNECTION,
+ ON_DISCONNECTED,
+ ON_CHANNEL_CREATED,
+ ON_CHANNEL_OPEN,
+ ON_CHANNEL_CLOSED,
+ ON_DATA,
+ START_DEFER,
+ }; /* types */
+
+ DataChannelOnMessageAvailable(int32_t aType,
+ DataChannelConnection *aConnection,
+ DataChannel *aChannel,
+ nsCString &aData, // XXX this causes inefficiency
+ int32_t aLen)
+ : mType(aType),
+ mChannel(aChannel),
+ mConnection(aConnection),
+ mData(aData),
+ mLen(aLen) {}
+
+ DataChannelOnMessageAvailable(int32_t aType,
+ DataChannel *aChannel)
+ : mType(aType),
+ mChannel(aChannel) {}
+ // XXX is it safe to leave mData/mLen uninitialized? This should only be
+ // used for notifications that don't use them, but I'd like more
+ // bulletproof compile-time checking.
+
+ DataChannelOnMessageAvailable(int32_t aType,
+ DataChannelConnection *aConnection,
+ DataChannel *aChannel)
+ : mType(aType),
+ mChannel(aChannel),
+ mConnection(aConnection) {}
+
+ NS_IMETHOD Run()
+ {
+ switch (mType) {
+ case ON_DATA:
+ case ON_CHANNEL_OPEN:
+ case ON_CHANNEL_CLOSED:
+ if (!mChannel->mListener)
+ return NS_OK;
+ break;
+ case ON_CHANNEL_CREATED:
+ case ON_CONNECTION:
+ case ON_DISCONNECTED:
+ if (!mConnection->mListener)
+ return NS_OK;
+ break;
+ case START_DEFER:
+ break;
+ }
+ switch (mType) {
+ case ON_DATA:
+ if (mLen < 0) {
+ mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
+ } else {
+ mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData);
+ }
+ break;
+ case ON_CHANNEL_OPEN:
+ mChannel->mListener->OnChannelConnected(mChannel->mContext);
+ break;
+ case ON_CHANNEL_CLOSED:
+ mChannel->mListener->OnChannelClosed(mChannel->mContext);
+ break;
+ case ON_CHANNEL_CREATED:
+ mConnection->mListener->NotifyDataChannel(mChannel);
+ break;
+ case ON_CONNECTION:
+ mConnection->mListener->NotifyConnection();
+ break;
+ case ON_DISCONNECTED:
+ mConnection->mListener->NotifyClosedConnection();
+ break;
+ case START_DEFER:
+ mConnection->StartDefer();
+ break;
+ }
+ return NS_OK;
+ }
+
+private:
+ ~DataChannelOnMessageAvailable() {}
+
+ int32_t mType;
+ // XXX should use union
+ DataChannel *mChannel; // XXX careful of ownership!
+ nsRefPtr<DataChannelConnection> mConnection;
+ nsCString mData;
+ int32_t mLen;
+};
+
+}
+
+#endif // NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_