author | Jason Duell <jduell.mcbugs@gmail.com> |
Thu, 15 Dec 2011 15:20:17 -0800 | |
changeset 84361 | 557f2ae6e8aaba6d68965f5137cc4752da85716e |
parent 84360 | 0b97cde3749314ee32c10c2257803359707493d5 |
child 84362 | 508b748801b73f2abc667deb65bf2d05590b578c |
push id | 519 |
push user | akeybl@mozilla.com |
push date | Wed, 01 Feb 2012 00:38:35 +0000 |
treeherder | mozilla-beta@788ea1ef610b [default view] [failures only] |
perfherder | [talos] [build metrics] [platform microbench] (compared to previous push) |
reviewers | mcmanus |
bugs | 676439 |
milestone | 11.0a1 |
first release with | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
last release without | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
--- a/netwerk/protocol/websocket/PWebSocket.ipdl +++ b/netwerk/protocol/websocket/PWebSocket.ipdl @@ -39,30 +39,32 @@ * ***** END LICENSE BLOCK ***** */ include protocol PNecko; include protocol PBrowser; include "mozilla/net/NeckoMessageUtils.h"; using IPC::URI; +using IPC::InputStream; namespace mozilla { namespace net { async protocol PWebSocket { manager PNecko; parent: // Forwarded methods corresponding to methods on nsIWebSocketChannel AsyncOpen(URI aURI, nsCString aOrigin, nsCString aProtocol, bool aSecure); Close(PRUint16 code, nsCString reason); SendMsg(nsCString aMsg); SendBinaryMsg(nsCString aMsg); + SendBinaryStream(InputStream aStream, PRUint32 aLength); DeleteSelf(); child: // Forwarded notifications corresponding to the nsIWebSocketListener interface OnStart(nsCString aProtocol, nsCString aExtensions); OnStop(nsresult aStatusCode); OnMessageAvailable(nsCString aMsg);
--- a/netwerk/protocol/websocket/WebSocketChannel.cpp +++ b/netwerk/protocol/websocket/WebSocketChannel.cpp @@ -61,16 +61,17 @@ #include "nsServiceManagerUtils.h" #include "nsXPIDLString.h" #include "nsCRT.h" #include "nsThreadUtils.h" #include "nsNetError.h" #include "nsStringStream.h" #include "nsAlgorithm.h" #include "nsProxyRelease.h" +#include "nsNetUtil.h" #include "plbase64.h" #include "prmem.h" #include "prnetdb.h" #include "prbit.h" #include "zlib.h" extern PRThread *gSocketThread; @@ -86,20 +87,16 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocket nsIProtocolHandler, nsIInputStreamCallback, nsIOutputStreamCallback, nsITimerCallback, nsIDNSListener, nsIInterfaceRequestor, nsIChannelEventSink) -// Use this fake ptr so the Fin message stays in sequence in the -// main transmit queue -#define kFinMessage (reinterpret_cast<nsCString *>(0x01)) - // An implementation of draft-ietf-hybi-thewebsocketprotocol-08 #define SEC_WEBSOCKET_VERSION "8" /* * About SSL unsigned certificates * * wss will not work to a host using an unsigned certificate unless there * is already an exception (i.e. it cannot popup a dialog asking for @@ -108,29 +105,33 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocket * be a problem in practice as it is expected the websocket javascript * is served from the same host as the websocket server (or of course, * a valid cert could just be provided). * */ // some helper classes +//----------------------------------------------------------------------------- +// CallOnMessageAvailable +//----------------------------------------------------------------------------- + class CallOnMessageAvailable : public nsIRunnable { public: NS_DECL_ISUPPORTS CallOnMessageAvailable(WebSocketChannel *aChannel, nsCString &aData, PRInt32 aLen) : mChannel(aChannel), mData(aData), mLen(aLen) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { if (mLen < 0) mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData); else mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData); return NS_OK; } @@ -138,120 +139,243 @@ private: ~CallOnMessageAvailable() {} nsRefPtr<WebSocketChannel> mChannel; nsCString mData; PRInt32 mLen; }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnMessageAvailable, nsIRunnable) +//----------------------------------------------------------------------------- +// CallOnStop +//----------------------------------------------------------------------------- + class CallOnStop : public nsIRunnable { public: NS_DECL_ISUPPORTS CallOnStop(WebSocketChannel *aChannel, nsresult aData) : mChannel(aChannel), mData(aData) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { mChannel->mListener->OnStop(mChannel->mContext, mData); return NS_OK; } private: ~CallOnStop() {} nsRefPtr<WebSocketChannel> mChannel; nsresult mData; }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnStop, nsIRunnable) +//----------------------------------------------------------------------------- +// CallOnServerClose +//----------------------------------------------------------------------------- + class CallOnServerClose : public nsIRunnable { public: NS_DECL_ISUPPORTS CallOnServerClose(WebSocketChannel *aChannel, PRUint16 aCode, nsCString &aReason) : mChannel(aChannel), mCode(aCode), mReason(aReason) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { mChannel->mListener->OnServerClose(mChannel->mContext, mCode, mReason); return NS_OK; } private: ~CallOnServerClose() {} nsRefPtr<WebSocketChannel> mChannel; PRUint16 mCode; nsCString mReason; }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnServerClose, nsIRunnable) +//----------------------------------------------------------------------------- +// CallAcknowledge +//----------------------------------------------------------------------------- + class CallAcknowledge : public nsIRunnable { public: NS_DECL_ISUPPORTS CallAcknowledge(WebSocketChannel *aChannel, PRUint32 aSize) : mChannel(aChannel), mSize(aSize) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { LOG(("WebSocketChannel::CallAcknowledge: Size %u\n", mSize)); mChannel->mListener->OnAcknowledge(mChannel->mContext, mSize); return NS_OK; } private: ~CallAcknowledge() {} nsRefPtr<WebSocketChannel> mChannel; PRUint32 mSize; }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallAcknowledge, nsIRunnable) -class nsPostMessage : public nsIRunnable +//----------------------------------------------------------------------------- +// OutboundMessage +//----------------------------------------------------------------------------- + +enum WsMsgType { + kMsgTypeString = 0, + kMsgTypeBinaryString, + kMsgTypeStream, + kMsgTypePing, + kMsgTypePong, + kMsgTypeFin +}; + +static const char* msgNames[] = { + "text", + "binaryString", + "binaryStream", + "ping", + "pong", + "close" +}; + +class OutboundMessage { public: - NS_DECL_ISUPPORTS - - nsPostMessage(WebSocketChannel *aChannel, - nsCString *aData, - PRInt32 aDataLen) - : mChannel(aChannel), - mData(aData), - mDataLen(aDataLen) {} - - NS_SCRIPTABLE NS_IMETHOD Run() + OutboundMessage(WsMsgType type, nsCString *str) + : mMsgType(type) + { + MOZ_COUNT_CTOR(OutboundMessage); + mMsg.pString = str; + mLength = str ? str->Length() : 0; + } + + OutboundMessage(nsIInputStream *stream, PRUint32 length) + : mMsgType(kMsgTypeStream), mLength(length) { - if (mData) - mChannel->SendMsgInternal(mData, mDataLen); + MOZ_COUNT_CTOR(OutboundMessage); + mMsg.pStream = stream; + mMsg.pStream->AddRef(); + } + + ~OutboundMessage() { + MOZ_COUNT_DTOR(OutboundMessage); + switch (mMsgType) { + case kMsgTypeString: + case kMsgTypeBinaryString: + case kMsgTypePing: + case kMsgTypePong: + delete mMsg.pString; + break; + case kMsgTypeStream: + // for now this only gets hit if msg deleted w/o being sent + if (mMsg.pStream) { + mMsg.pStream->Close(); + mMsg.pStream->Release(); + } + break; + case kMsgTypeFin: + break; // do-nothing: avoid compiler warning + } + } + + WsMsgType GetMsgType() const { return mMsgType; } + PRInt32 Length() const { return mLength; } + + PRUint8* BeginWriting() { + NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream, + "Stream should have been converted to string by now"); + return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginWriting() : nsnull); + } + + PRUint8* BeginReading() { + NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream, + "Stream should have been converted to string by now"); + return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginReading() : nsnull); + } + + nsresult ConvertStreamToString() + { + NS_ABORT_IF_FALSE(mMsgType == kMsgTypeStream, "Not a stream!"); + +#ifdef DEBUG + // Make sure we got correct length from Blob + PRUint32 bytes; + mMsg.pStream->Available(&bytes); + NS_ASSERTION(bytes == mLength, "Stream length != blob length!"); +#endif + + nsAutoPtr<nsCString> temp(new nsCString()); + nsresult rv = NS_ReadInputStreamToString(mMsg.pStream, *temp, mLength); + + NS_ENSURE_SUCCESS(rv, rv); + + mMsg.pStream->Close(); + mMsg.pStream->Release(); + mMsg.pString = temp.forget(); + mMsgType = kMsgTypeBinaryString; + return NS_OK; } private: - ~nsPostMessage() {} - - nsRefPtr<WebSocketChannel> mChannel; - nsCString *mData; - PRInt32 mDataLen; + union { + nsCString *pString; + nsIInputStream *pStream; + } mMsg; + WsMsgType mMsgType; + PRUint32 mLength; }; -NS_IMPL_THREADSAFE_ISUPPORTS1(nsPostMessage, nsIRunnable) - + +//----------------------------------------------------------------------------- +// OutboundEnqueuer +//----------------------------------------------------------------------------- + +class OutboundEnqueuer : public nsIRunnable +{ +public: + NS_DECL_ISUPPORTS + + OutboundEnqueuer(WebSocketChannel *aChannel, OutboundMessage *aMsg) + : mChannel(aChannel), mMessage(aMsg) {} + + NS_IMETHOD Run() + { + mChannel->EnqueueOutgoingMessage(mChannel->mOutgoingMessages, mMessage); + return NS_OK; + } + +private: + ~OutboundEnqueuer() {} + + nsRefPtr<WebSocketChannel> mChannel; + OutboundMessage *mMessage; +}; +NS_IMPL_THREADSAFE_ISUPPORTS1(OutboundEnqueuer, nsIRunnable) + +//----------------------------------------------------------------------------- +// nsWSAdmissionManager +//----------------------------------------------------------------------------- // Section 5.1 requires that a client rate limit its connects to a single // TCP session in the CONNECTING state (i.e. anything before the 101 upgrade // complete response comes back and an open javascript event is created) class nsWSAdmissionManager { public: @@ -404,20 +528,24 @@ private: return -1; } // ConnectedCount might be decremented from the main or the socket // thread, so manage it with atomic counters PRInt32 mConnectedCount; }; +//----------------------------------------------------------------------------- +// nsWSCompression +// // similar to nsDeflateConverter except for the mandatory FLUSH calls // required by websocket and the absence of the deflate termination // block which is appropriate because it would create data bytes after // sending the websockets CLOSE message. +//----------------------------------------------------------------------------- class nsWSCompression { public: nsWSCompression(nsIStreamListener *aListener, nsISupports *aContext) : mActive(false), mContext(aContext), @@ -529,17 +657,19 @@ private: nsIStreamListener *mListener; /* weak ref */ const static PRInt32 kBufferLen = 4096; PRUint8 mBuffer[kBufferLen]; }; static nsWSAdmissionManager *sWebSocketAdmissions = nsnull; +//----------------------------------------------------------------------------- // WebSocketChannel +//----------------------------------------------------------------------------- WebSocketChannel::WebSocketChannel() : mCloseTimeout(20000), mOpenTimeout(20000), mPingTimeout(0), mPingResponseTimeout(10000), mMaxConcurrentConnections(200), mRecvdHttpOnStartRequest(0), @@ -1048,16 +1178,19 @@ WebSocketChannel::ProcessInput(PRUint8 * mBuffered = 0; } return NS_OK; } void WebSocketChannel::ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len) { + if (!data || len == 0) + return; + // Optimally we want to apply the mask 32 bits at a time, // but the buffer might not be alligned. So we first deal with // 0 to 3 bytes of preamble individually while (len && (reinterpret_cast<PRUptrdiff>(data) & 3)) { *data ^= mask >> 24; mask = PR_ROTATE_LEFT32(mask, 8); data++; @@ -1084,59 +1217,53 @@ WebSocketChannel::ApplyMask(PRUint32 mas data++; len--; } } void WebSocketChannel::GeneratePing() { - LOG(("WebSocketChannel::GeneratePing() %p\n", this)); - nsCString *buf = new nsCString(); buf->Assign("PING"); - mOutgoingPingMessages.Push(new OutboundMessage(buf)); - OnOutputStreamReady(mSocketOut); + EnqueueOutgoingMessage(mOutgoingPingMessages, + new OutboundMessage(kMsgTypePing, buf)); } void WebSocketChannel::GeneratePong(PRUint8 *payload, PRUint32 len) { - LOG(("WebSocketChannel::GeneratePong() %p [%p %u]\n", this, payload, len)); - nsCString *buf = new nsCString(); buf->SetLength(len); if (buf->Length() < len) { LOG(("WebSocketChannel::GeneratePong Allocation Failure\n")); delete buf; return; } memcpy(buf->BeginWriting(), payload, len); - mOutgoingPongMessages.Push(new OutboundMessage(buf)); - OnOutputStreamReady(mSocketOut); + EnqueueOutgoingMessage(mOutgoingPongMessages, + new OutboundMessage(kMsgTypePong, buf)); } void -WebSocketChannel::SendMsgInternal(nsCString *aMsg, - PRInt32 aDataLen) +WebSocketChannel::EnqueueOutgoingMessage(nsDeque &aQueue, + OutboundMessage *aMsg) { - LOG(("WebSocketChannel::SendMsgInternal %p [%p len=%d]\n", this, aMsg, - aDataLen)); NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); - if (aMsg == kFinMessage) { - mOutgoingMessages.Push(new OutboundMessage()); - } else if (aDataLen < 0) { - mOutgoingMessages.Push(new OutboundMessage(aMsg)); - } else { - mOutgoingMessages.Push(new OutboundMessage(aMsg, aDataLen)); - } + + LOG(("WebSocketChannel::EnqueueOutgoingMessage %p " + "queueing msg %p [type=%s len=%d]\n", + this, aMsg, msgNames[aMsg->GetMsgType()], aMsg->Length())); + + aQueue.Push(aMsg); OnOutputStreamReady(mSocketOut); } + PRUint16 WebSocketChannel::ResultToCloseCode(nsresult resultCode) { if (NS_SUCCEEDED(resultCode)) return CLOSE_NORMAL; if (resultCode == NS_ERROR_FILE_TOO_BIG) return CLOSE_TOO_LARGE; if (resultCode == NS_BASE_STREAM_CLOSED || @@ -1150,44 +1277,52 @@ WebSocketChannel::ResultToCloseCode(nsre void WebSocketChannel::PrimeNewOutgoingMessage() { LOG(("WebSocketChannel::PrimeNewOutgoingMessage() %p\n", this)); NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); NS_ABORT_IF_FALSE(!mCurrentOut, "Current message in progress"); - bool isPong = false; - bool isPing = false; + nsresult rv = NS_OK; mCurrentOut = (OutboundMessage *)mOutgoingPongMessages.PopFront(); if (mCurrentOut) { - isPong = true; + NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePong, + "Not pong message!"); } else { mCurrentOut = (OutboundMessage *)mOutgoingPingMessages.PopFront(); if (mCurrentOut) - isPing = true; + NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePing, + "Not ping message!"); else mCurrentOut = (OutboundMessage *)mOutgoingMessages.PopFront(); } if (!mCurrentOut) return; + + WsMsgType msgType = mCurrentOut->GetMsgType(); + + LOG(("WebSocketChannel::PrimeNewOutgoingMessage " + "%p found queued msg %p [type=%s len=%d]\n", + this, mCurrentOut, msgNames[msgType], mCurrentOut->Length())); + mCurrentOutSent = 0; mHdrOut = mOutHeader; PRUint8 *payload = nsnull; - if (mCurrentOut->IsControl() && !isPing && !isPong) { + + if (msgType == kMsgTypeFin) { // This is a demand to create a close message if (mClientClosed) { PrimeNewOutgoingMessage(); return; } - LOG(("WebSocketChannel:: PrimeNewOutgoingMessage() found close request\n")); mClientClosed = 1; mOutHeader[0] = kFinalFragBit | kClose; mOutHeader[1] = 0x02; // payload len = 2, maybe more for reason mOutHeader[1] |= kMaskBit; // payload is offset 6 including 4 for the mask payload = mOutHeader + 6; @@ -1215,40 +1350,55 @@ WebSocketChannel::PrimeNewOutgoingMessag if (mServerClosed) { /* bidi close complete */ mReleaseOnTransmit = 1; } else if (NS_FAILED(mStopOnClose)) { /* result of abort session - give up */ StopSession(mStopOnClose); } else { /* wait for reciprocal close from server */ - nsresult rv; mCloseTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); if (NS_SUCCEEDED(rv)) { mCloseTimer->InitWithCallback(this, mCloseTimeout, nsITimer::TYPE_ONE_SHOT); } else { StopSession(rv); } } } else { - if (isPong) { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found pong request\n")); + switch (msgType) { + case kMsgTypePong: mOutHeader[0] = kFinalFragBit | kPong; - } else if (isPing) { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found ping request\n")); + break; + case kMsgTypePing: mOutHeader[0] = kFinalFragBit | kPing; - } else if (mCurrentOut->BinaryLen() < 0) { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() " - "found queued text message len %d\n", mCurrentOut->Length())); + break; + case kMsgTypeString: mOutHeader[0] = kFinalFragBit | kText; - } else { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() " - "found queued binary message len %d\n", mCurrentOut->Length())); + break; + case kMsgTypeStream: + // HACK ALERT: read in entire stream into string. + // Will block socket transport thread if file is blocking. + // TODO: bug 704447: don't block socket thread! + rv = mCurrentOut->ConvertStreamToString(); + if (NS_FAILED(rv)) { + AbortSession(rv); + return; + } + // Now we're a binary string + msgType = kMsgTypeBinaryString; + + // no break: fall down into binary string case + + case kMsgTypeBinaryString: mOutHeader[0] = kFinalFragBit | kBinary; + break; + case kMsgTypeFin: + NS_ABORT_IF_FALSE(false, "unreachable"); // avoid compiler warning + break; } if (mCurrentOut->Length() < 126) { mOutHeader[1] = mCurrentOut->Length() | kMaskBit; mHdrOutToSend = 6; } else if (mCurrentOut->Length() <= 0xffff) { mOutHeader[1] = 126 | kMaskBit; ((PRUint16 *)mOutHeader)[1] = @@ -1261,17 +1411,17 @@ WebSocketChannel::PrimeNewOutgoingMessag memcpy(mOutHeader + 2, &tempLen, 8); mHdrOutToSend = 14; } payload = mOutHeader + mHdrOutToSend; } NS_ABORT_IF_FALSE(payload, "payload offset not found"); - // Perfom the sending mask. never use a zero mask + // Perform the sending mask. Never use a zero mask PRUint32 mask; do { PRUint8 *buffer; nsresult rv = mRandomGenerator->GenerateRandomBytes(4, &buffer); if (NS_FAILED(rv)) { LOG(("WebSocketChannel::PrimeNewOutgoingMessage(): " "GenerateRandomBytes failure %x\n", rv)); StopSession(rv); @@ -1503,18 +1653,19 @@ WebSocketChannel::AbortSession(nsresult if (mStopped) return; mStopped = 1; if (mTransport && reason != NS_BASE_STREAM_CLOSED && !mRequestedClose && !mClientClosed && !mServerClosed) { mRequestedClose = 1; - mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1), - nsIEventTarget::DISPATCH_NORMAL); + mSocketThread->Dispatch( + new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)), + nsIEventTarget::DISPATCH_NORMAL); mStopOnClose = reason; } else { StopSession(reason); } } // ReleaseSession is called on orderly shutdown void @@ -2115,67 +2266,74 @@ WebSocketChannel::Close(PRUint16 code, c // The API requires the UTF-8 string to be 123 or less bytes if (reason.Length() > 123) return NS_ERROR_ILLEGAL_VALUE; mRequestedClose = 1; mScriptCloseReason = reason; mScriptCloseCode = code; - - return mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1), - nsIEventTarget::DISPATCH_NORMAL); + + return mSocketThread->Dispatch( + new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)), + nsIEventTarget::DISPATCH_NORMAL); } NS_IMETHODIMP WebSocketChannel::SendMsg(const nsACString &aMsg) { LOG(("WebSocketChannel::SendMsg() %p\n", this)); - NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); - - if (mRequestedClose) { - LOG(("WebSocketChannel:: SendMsg when closed error\n")); - return NS_ERROR_UNEXPECTED; - } - - if (mStopped) { - LOG(("WebSocketChannel:: SendMsg when stopped error\n")); - return NS_ERROR_NOT_CONNECTED; - } - - return mSocketThread->Dispatch( - new nsPostMessage(this, new nsCString(aMsg), -1), - nsIEventTarget::DISPATCH_NORMAL); + + return SendMsgCommon(&aMsg, false, aMsg.Length()); } NS_IMETHODIMP WebSocketChannel::SendBinaryMsg(const nsACString &aMsg) { LOG(("WebSocketChannel::SendBinaryMsg() %p len=%d\n", this, aMsg.Length())); + return SendMsgCommon(&aMsg, true, aMsg.Length()); +} + +NS_IMETHODIMP +WebSocketChannel::SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength) +{ + LOG(("WebSocketChannel::SendBinaryStream() %p\n", this)); + + return SendMsgCommon(nsnull, true, aLength, aStream); +} + +nsresult +WebSocketChannel::SendMsgCommon(const nsACString *aMsg, bool aIsBinary, + PRUint32 aLength, nsIInputStream *aStream) +{ NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); if (mRequestedClose) { - LOG(("WebSocketChannel:: SendBinaryMsg when closed error\n")); + LOG(("WebSocketChannel:: Error: send when closed\n")); return NS_ERROR_UNEXPECTED; } if (mStopped) { - LOG(("WebSocketChannel:: SendBinaryMsg when stopped error\n")); + LOG(("WebSocketChannel:: Error: send when stopped\n")); return NS_ERROR_NOT_CONNECTED; } - return mSocketThread->Dispatch(new nsPostMessage(this, new nsCString(aMsg), - aMsg.Length()), - nsIEventTarget::DISPATCH_NORMAL); + return mSocketThread->Dispatch( + aStream ? new OutboundEnqueuer(this, new OutboundMessage(aStream, aLength)) + : new OutboundEnqueuer(this, + new OutboundMessage(aIsBinary ? kMsgTypeBinaryString + : kMsgTypeString, + new nsCString(*aMsg))), + nsIEventTarget::DISPATCH_NORMAL); } NS_IMETHODIMP WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport, - nsIAsyncInputStream *aSocketIn, - nsIAsyncOutputStream *aSocketOut) + nsIAsyncInputStream *aSocketIn, + nsIAsyncOutputStream *aSocketOut) { LOG(("WebSocketChannel::OnTransportAvailable %p [%p %p %p] rcvdonstart=%d\n", this, aTransport, aSocketIn, aSocketOut, mRecvdHttpOnStartRequest)); NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); NS_ABORT_IF_FALSE(!mRecvdHttpUpgradeTransport, "OTA duplicated"); NS_ABORT_IF_FALSE(aSocketIn, "OTA with invalid socketIn"); @@ -2196,17 +2354,17 @@ WebSocketChannel::OnTransportAvailable(n return StartWebsocketData(); return NS_OK; } // nsIRequestObserver (from nsIStreamListener) NS_IMETHODIMP WebSocketChannel::OnStartRequest(nsIRequest *aRequest, - nsISupports *aContext) + nsISupports *aContext) { LOG(("WebSocketChannel::OnStartRequest(): %p [%p %p] recvdhttpupgrade=%d\n", this, aRequest, aContext, mRecvdHttpUpgradeTransport)); NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); NS_ABORT_IF_FALSE(!mRecvdHttpOnStartRequest, "OTA duplicated"); // Generating the onStart event will take us out of the // CONNECTING state which means we can now open another,
--- a/netwerk/protocol/websocket/WebSocketChannel.h +++ b/netwerk/protocol/websocket/WebSocketChannel.h @@ -62,17 +62,18 @@ #include "BaseWebSocketChannel.h" #include "nsCOMPtr.h" #include "nsString.h" #include "nsDeque.h" namespace mozilla { namespace net { -class nsPostMessage; +class OutboundMessage; +class OutboundEnqueuer; class nsWSAdmissionManager; class nsWSCompression; class CallOnMessageAvailable; class CallOnStop; class CallOnServerClose; class CallAcknowledge; class WebSocketChannel : public BaseWebSocketChannel, @@ -101,16 +102,17 @@ public: // NS_IMETHOD AsyncOpen(nsIURI *aURI, const nsACString &aOrigin, nsIWebSocketListener *aListener, nsISupports *aContext); NS_IMETHOD Close(PRUint16 aCode, const nsACString & aReason); NS_IMETHOD SendMsg(const nsACString &aMsg); NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); + NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 length); NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo); WebSocketChannel(); static void Shutdown(); enum { // Non Control Frames kContinuation = 0x0, @@ -126,24 +128,29 @@ public: const static PRUint32 kControlFrameMask = 0x8; const static PRUint8 kMaskBit = 0x80; const static PRUint8 kFinalFragBit = 0x80; protected: virtual ~WebSocketChannel(); private: - friend class nsPostMessage; + friend class OutboundEnqueuer; friend class nsWSAdmissionManager; friend class CallOnMessageAvailable; friend class CallOnStop; friend class CallOnServerClose; friend class CallAcknowledge; - void SendMsgInternal(nsCString *aMsg, PRInt32 datalen); + // Common send code for binary + text msgs + nsresult SendMsgCommon(const nsACString *aMsg, bool isBinary, + PRUint32 length, nsIInputStream *aStream = NULL); + + void EnqueueOutgoingMessage(nsDeque &aQueue, OutboundMessage *aMsg); + void PrimeNewOutgoingMessage(); void GeneratePong(PRUint8 *payload, PRUint32 len); void GeneratePing(); nsresult BeginOpen(); nsresult HandleExtensions(); nsresult SetupRequest(); nsresult ApplyForAdmission(); @@ -158,58 +165,16 @@ private: void EnsureHdrOut(PRUint32 size); void ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len); bool IsPersistentFramePtr(); nsresult ProcessInput(PRUint8 *buffer, PRUint32 count); PRUint32 UpdateReadBuffer(PRUint8 *buffer, PRUint32 count, PRUint32 accumulatedFragments); - class OutboundMessage - { - public: - OutboundMessage (nsCString *str) - : mMsg(str), mIsControl(false), mBinaryLen(-1) - { MOZ_COUNT_CTOR(WebSocketOutboundMessage); } - - OutboundMessage (nsCString *str, PRInt32 dataLen) - : mMsg(str), mIsControl(false), mBinaryLen(dataLen) - { MOZ_COUNT_CTOR(WebSocketOutboundMessage); } - - OutboundMessage () - : mMsg(nsnull), mIsControl(true), mBinaryLen(-1) - { MOZ_COUNT_CTOR(WebSocketOutboundMessage); } - - ~OutboundMessage() - { - MOZ_COUNT_DTOR(WebSocketOutboundMessage); - delete mMsg; - } - - bool IsControl() { return mIsControl; } - const nsCString *Msg() { return mMsg; } - PRInt32 BinaryLen() { return mBinaryLen; } - PRInt32 Length() - { - if (mBinaryLen >= 0) - return mBinaryLen; - return mMsg ? mMsg->Length() : 0; - } - PRUint8 *BeginWriting() { - return (PRUint8 *)(mMsg ? mMsg->BeginWriting() : nsnull); - } - PRUint8 *BeginReading() { - return (PRUint8 *)(mMsg ? mMsg->BeginReading() : nsnull); - } - - private: - nsCString *mMsg; - bool mIsControl; - PRInt32 mBinaryLen; - }; nsCOMPtr<nsIEventTarget> mSocketThread; nsCOMPtr<nsIHttpChannelInternal> mChannel; nsCOMPtr<nsIHttpChannel> mHttpChannel; nsCOMPtr<nsICancelable> mDNSRequest; nsCOMPtr<nsIAsyncVerifyRedirectCallback> mRedirectCallback; nsCOMPtr<nsIRandomGenerator> mRandomGenerator; @@ -229,17 +194,17 @@ private: nsCOMPtr<nsITimer> mPingTimer; PRUint32 mPingTimeout; /* milliseconds */ PRUint32 mPingResponseTimeout; /* milliseconds */ nsCOMPtr<nsITimer> mLingeringCloseTimer; const static PRInt32 kLingeringCloseTimeout = 1000; const static PRInt32 kLingeringCloseThreshold = 50; - PRUint32 mMaxConcurrentConnections; + PRInt32 mMaxConcurrentConnections; PRUint32 mRecvdHttpOnStartRequest : 1; PRUint32 mRecvdHttpUpgradeTransport : 1; PRUint32 mRequestedClose : 1; PRUint32 mClientClosed : 1; PRUint32 mServerClosed : 1; PRUint32 mStopped : 1; PRUint32 mCalledOnStop : 1;
--- a/netwerk/protocol/websocket/WebSocketChannelChild.cpp +++ b/netwerk/protocol/websocket/WebSocketChannelChild.cpp @@ -401,16 +401,27 @@ WebSocketChannelChild::SendBinaryMsg(con LOG(("WebSocketChannelChild::SendBinaryMsg() %p\n", this)); if (!mIPCOpen || !SendSendBinaryMsg(nsCString(aMsg))) return NS_ERROR_UNEXPECTED; return NS_OK; } NS_IMETHODIMP +WebSocketChannelChild::SendBinaryStream(nsIInputStream *aStream, + PRUint32 aLength) +{ + LOG(("WebSocketChannelChild::SendBinaryStream() %p\n", this)); + + if (!mIPCOpen || !SendSendBinaryStream(IPC::InputStream(aStream), aLength)) + return NS_ERROR_UNEXPECTED; + return NS_OK; +} + +NS_IMETHODIMP WebSocketChannelChild::GetSecurityInfo(nsISupports **aSecurityInfo) { LOG(("WebSocketChannelChild::GetSecurityInfo() %p\n", this)); return NS_ERROR_NOT_AVAILABLE; } } // namespace net } // namespace mozilla
--- a/netwerk/protocol/websocket/WebSocketChannelChild.h +++ b/netwerk/protocol/websocket/WebSocketChannelChild.h @@ -55,24 +55,23 @@ class WebSocketChannelChild : public Bas public: WebSocketChannelChild(bool aSecure); ~WebSocketChannelChild(); NS_DECL_ISUPPORTS // nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us // - NS_SCRIPTABLE NS_IMETHOD AsyncOpen(nsIURI *aURI, - const nsACString &aOrigin, - nsIWebSocketListener *aListener, - nsISupports *aContext); - NS_SCRIPTABLE NS_IMETHOD Close(PRUint16 code, const nsACString & reason); - NS_SCRIPTABLE NS_IMETHOD SendMsg(const nsACString &aMsg); - NS_SCRIPTABLE NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); - NS_SCRIPTABLE NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo); + NS_IMETHOD AsyncOpen(nsIURI *aURI, const nsACString &aOrigin, + nsIWebSocketListener *aListener, nsISupports *aContext); + NS_IMETHOD Close(PRUint16 code, const nsACString & reason); + NS_IMETHOD SendMsg(const nsACString &aMsg); + NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); + NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength); + NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo); void AddIPDLReference(); void ReleaseIPDLReference(); private: bool RecvOnStart(const nsCString& aProtocol, const nsCString& aExtensions); bool RecvOnStop(const nsresult& aStatusCode); bool RecvOnMessageAvailable(const nsCString& aMsg);
--- a/netwerk/protocol/websocket/WebSocketChannelParent.cpp +++ b/netwerk/protocol/websocket/WebSocketChannelParent.cpp @@ -132,16 +132,28 @@ WebSocketChannelParent::RecvSendBinaryMs LOG(("WebSocketChannelParent::RecvSendBinaryMsg() %p\n", this)); if (mChannel) { nsresult rv = mChannel->SendBinaryMsg(aMsg); NS_ENSURE_SUCCESS(rv, true); } return true; } +bool +WebSocketChannelParent::RecvSendBinaryStream(const InputStream& aStream, + const PRUint32& aLength) +{ + LOG(("WebSocketChannelParent::RecvSendBinaryStream() %p\n", this)); + if (mChannel) { + nsresult rv = mChannel->SendBinaryStream(aStream, aLength); + NS_ENSURE_SUCCESS(rv, true); + } + return true; +} + NS_IMETHODIMP WebSocketChannelParent::GetInterface(const nsIID & iid, void **result NS_OUTPARAM) { LOG(("WebSocketChannelParent::GetInterface() %p\n", this)); if (mAuthProvider && iid.Equals(NS_GET_IID(nsIAuthPromptProvider))) return mAuthProvider->GetAuthPrompt(nsIAuthPromptProvider::PROMPT_NORMAL, iid, result);
--- a/netwerk/protocol/websocket/WebSocketChannelParent.h +++ b/netwerk/protocol/websocket/WebSocketChannelParent.h @@ -65,16 +65,18 @@ class WebSocketChannelParent : public PW private: bool RecvAsyncOpen(const IPC::URI& aURI, const nsCString& aOrigin, const nsCString& aProtocol, const bool& aSecure); bool RecvClose(const PRUint16 & code, const nsCString & reason); bool RecvSendMsg(const nsCString& aMsg); bool RecvSendBinaryMsg(const nsCString& aMsg); + bool RecvSendBinaryStream(const InputStream& aStream, + const PRUint32& aLength); bool RecvDeleteSelf(); void ActorDestroy(ActorDestroyReason why); nsCOMPtr<nsIAuthPromptProvider> mAuthProvider; nsCOMPtr<nsIWebSocketChannel> mChannel; bool mIPCOpen; };
--- a/netwerk/protocol/websocket/nsIWebSocketChannel.idl +++ b/netwerk/protocol/websocket/nsIWebSocketChannel.idl @@ -36,20 +36,24 @@ * the terms of any one of the MPL, the GPL or the LGPL. * * ***** END LICENSE BLOCK ***** */ interface nsIURI; interface nsIInterfaceRequestor; interface nsILoadGroup; interface nsIWebSocketListener; +interface nsIInputStream; #include "nsISupports.idl" -[scriptable, uuid(e8ae0371-c28f-4d61-b257-514e014a4686)] +/** + * You probably want nsI{Moz}WebSocket.idl + */ +[uuid(bb69e5d7-d9cd-4aab-9abe-98f80cf8b8b8)] interface nsIWebSocketChannel : nsISupports { /** * The original URI used to construct the protocol connection. This is used * in the case of a redirect or URI "resolution" (e.g. resolving a * resource: URI to a file: URI) so that the original pre-redirect * URI can still be obtained. This is never null. */ @@ -135,9 +139,17 @@ interface nsIWebSocketChannel : nsISuppo void sendMsg(in AUTF8String aMsg); /** * Use to send binary message down the connection to WebSocket peer. * * @param aMsg the data to send */ void sendBinaryMsg(in ACString aMsg); + + /** + * Use to send a binary stream (Blob) to Websocket peer. + * + * @param aStream The input stream to be sent. + */ + void sendBinaryStream(in nsIInputStream aStream, + in unsigned long length); };