Bug 676439 - Websocket Binary Message support: Necko changes. r=mcmanus
authorJason Duell <jduell.mcbugs@gmail.com>
Thu, 15 Dec 2011 15:20:17 -0800
changeset 84361 557f2ae6e8aaba6d68965f5137cc4752da85716e
parent 84360 0b97cde3749314ee32c10c2257803359707493d5
child 84362 508b748801b73f2abc667deb65bf2d05590b578c
push id519
push userakeybl@mozilla.com
push dateWed, 01 Feb 2012 00:38:35 +0000
treeherdermozilla-beta@788ea1ef610b [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmcmanus
bugs676439
milestone11.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 676439 - Websocket Binary Message support: Necko changes. r=mcmanus
netwerk/protocol/websocket/PWebSocket.ipdl
netwerk/protocol/websocket/WebSocketChannel.cpp
netwerk/protocol/websocket/WebSocketChannel.h
netwerk/protocol/websocket/WebSocketChannelChild.cpp
netwerk/protocol/websocket/WebSocketChannelChild.h
netwerk/protocol/websocket/WebSocketChannelParent.cpp
netwerk/protocol/websocket/WebSocketChannelParent.h
netwerk/protocol/websocket/nsIWebSocketChannel.idl
--- a/netwerk/protocol/websocket/PWebSocket.ipdl
+++ b/netwerk/protocol/websocket/PWebSocket.ipdl
@@ -39,30 +39,32 @@
  * ***** END LICENSE BLOCK ***** */
 
 include protocol PNecko;
 include protocol PBrowser;
 
 include "mozilla/net/NeckoMessageUtils.h";
 
 using IPC::URI;
+using IPC::InputStream;
 
 namespace mozilla {
 namespace net {
 
 async protocol PWebSocket
 {
   manager PNecko;
 
 parent:
   // Forwarded methods corresponding to methods on nsIWebSocketChannel
   AsyncOpen(URI aURI, nsCString aOrigin, nsCString aProtocol, bool aSecure);
   Close(PRUint16 code, nsCString reason);
   SendMsg(nsCString aMsg);
   SendBinaryMsg(nsCString aMsg);
+  SendBinaryStream(InputStream aStream, PRUint32 aLength);
 
   DeleteSelf();
 
 child:
   // Forwarded notifications corresponding to the nsIWebSocketListener interface
   OnStart(nsCString aProtocol, nsCString aExtensions);
   OnStop(nsresult aStatusCode);
   OnMessageAvailable(nsCString aMsg);
--- a/netwerk/protocol/websocket/WebSocketChannel.cpp
+++ b/netwerk/protocol/websocket/WebSocketChannel.cpp
@@ -61,16 +61,17 @@
 #include "nsServiceManagerUtils.h"
 #include "nsXPIDLString.h"
 #include "nsCRT.h"
 #include "nsThreadUtils.h"
 #include "nsNetError.h"
 #include "nsStringStream.h"
 #include "nsAlgorithm.h"
 #include "nsProxyRelease.h"
+#include "nsNetUtil.h"
 
 #include "plbase64.h"
 #include "prmem.h"
 #include "prnetdb.h"
 #include "prbit.h"
 #include "zlib.h"
 
 extern PRThread *gSocketThread;
@@ -86,20 +87,16 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocket
                                nsIProtocolHandler,
                                nsIInputStreamCallback,
                                nsIOutputStreamCallback,
                                nsITimerCallback,
                                nsIDNSListener,
                                nsIInterfaceRequestor,
                                nsIChannelEventSink)
 
-// Use this fake ptr so the Fin message stays in sequence in the
-// main transmit queue
-#define kFinMessage (reinterpret_cast<nsCString *>(0x01))
-
 // An implementation of draft-ietf-hybi-thewebsocketprotocol-08
 #define SEC_WEBSOCKET_VERSION "8"
 
 /*
  * About SSL unsigned certificates
  *
  * wss will not work to a host using an unsigned certificate unless there
  * is already an exception (i.e. it cannot popup a dialog asking for
@@ -108,29 +105,33 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocket
  * be a problem in practice as it is expected the websocket javascript
  * is served from the same host as the websocket server (or of course,
  * a valid cert could just be provided).
  *
  */
 
 // some helper classes
 
+//-----------------------------------------------------------------------------
+// CallOnMessageAvailable
+//-----------------------------------------------------------------------------
+
 class CallOnMessageAvailable : public nsIRunnable
 {
 public:
   NS_DECL_ISUPPORTS
 
   CallOnMessageAvailable(WebSocketChannel *aChannel,
                          nsCString        &aData,
                          PRInt32           aLen)
     : mChannel(aChannel),
       mData(aData),
       mLen(aLen) {}
 
-  NS_SCRIPTABLE NS_IMETHOD Run()
+  NS_IMETHOD Run()
   {
     if (mLen < 0)
       mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
     else
       mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData);
     return NS_OK;
   }
 
@@ -138,120 +139,243 @@ private:
   ~CallOnMessageAvailable() {}
 
   nsRefPtr<WebSocketChannel>        mChannel;
   nsCString                         mData;
   PRInt32                           mLen;
 };
 NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnMessageAvailable, nsIRunnable)
 
+//-----------------------------------------------------------------------------
+// CallOnStop
+//-----------------------------------------------------------------------------
+
 class CallOnStop : public nsIRunnable
 {
 public:
   NS_DECL_ISUPPORTS
 
   CallOnStop(WebSocketChannel *aChannel,
              nsresult          aData)
     : mChannel(aChannel),
       mData(aData) {}
 
-  NS_SCRIPTABLE NS_IMETHOD Run()
+  NS_IMETHOD Run()
   {
     mChannel->mListener->OnStop(mChannel->mContext, mData);
     return NS_OK;
   }
 
 private:
   ~CallOnStop() {}
 
   nsRefPtr<WebSocketChannel>        mChannel;
   nsresult                          mData;
 };
 NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnStop, nsIRunnable)
 
+//-----------------------------------------------------------------------------
+// CallOnServerClose
+//-----------------------------------------------------------------------------
+
 class CallOnServerClose : public nsIRunnable
 {
 public:
   NS_DECL_ISUPPORTS
 
   CallOnServerClose(WebSocketChannel *aChannel,
                     PRUint16          aCode,
                     nsCString        &aReason)
     : mChannel(aChannel),
       mCode(aCode),
       mReason(aReason) {}
 
-  NS_SCRIPTABLE NS_IMETHOD Run()
+  NS_IMETHOD Run()
   {
     mChannel->mListener->OnServerClose(mChannel->mContext, mCode, mReason);
     return NS_OK;
   }
 
 private:
   ~CallOnServerClose() {}
 
   nsRefPtr<WebSocketChannel>        mChannel;
   PRUint16                          mCode;
   nsCString                         mReason;
 };
 NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnServerClose, nsIRunnable)
 
+//-----------------------------------------------------------------------------
+// CallAcknowledge
+//-----------------------------------------------------------------------------
+
 class CallAcknowledge : public nsIRunnable
 {
 public:
   NS_DECL_ISUPPORTS
 
   CallAcknowledge(WebSocketChannel *aChannel,
                   PRUint32          aSize)
     : mChannel(aChannel),
       mSize(aSize) {}
 
-  NS_SCRIPTABLE NS_IMETHOD Run()
+  NS_IMETHOD Run()
   {
     LOG(("WebSocketChannel::CallAcknowledge: Size %u\n", mSize));
     mChannel->mListener->OnAcknowledge(mChannel->mContext, mSize);
     return NS_OK;
   }
 
 private:
   ~CallAcknowledge() {}
 
   nsRefPtr<WebSocketChannel>        mChannel;
   PRUint32                          mSize;
 };
 NS_IMPL_THREADSAFE_ISUPPORTS1(CallAcknowledge, nsIRunnable)
 
-class nsPostMessage : public nsIRunnable
+//-----------------------------------------------------------------------------
+// OutboundMessage
+//-----------------------------------------------------------------------------
+
+enum WsMsgType {
+  kMsgTypeString = 0,
+  kMsgTypeBinaryString,
+  kMsgTypeStream,
+  kMsgTypePing,
+  kMsgTypePong,
+  kMsgTypeFin
+};
+
+static const char* msgNames[] = {
+  "text",
+  "binaryString",
+  "binaryStream",
+  "ping",
+  "pong",
+  "close"
+};
+
+class OutboundMessage
 {
 public:
-  NS_DECL_ISUPPORTS
-
-  nsPostMessage(WebSocketChannel *aChannel,
-                nsCString        *aData,
-                PRInt32           aDataLen)
-    : mChannel(aChannel),
-      mData(aData),
-      mDataLen(aDataLen) {}
-
-  NS_SCRIPTABLE NS_IMETHOD Run()
+  OutboundMessage(WsMsgType type, nsCString *str)
+    : mMsgType(type)
+  {
+    MOZ_COUNT_CTOR(OutboundMessage);
+    mMsg.pString = str;
+    mLength = str ? str->Length() : 0;
+  }
+
+  OutboundMessage(nsIInputStream *stream, PRUint32 length)
+    : mMsgType(kMsgTypeStream), mLength(length)
   {
-    if (mData)
-      mChannel->SendMsgInternal(mData, mDataLen);
+    MOZ_COUNT_CTOR(OutboundMessage);
+    mMsg.pStream = stream;
+    mMsg.pStream->AddRef();
+  }
+
+ ~OutboundMessage() {
+    MOZ_COUNT_DTOR(OutboundMessage);
+    switch (mMsgType) {
+      case kMsgTypeString:
+      case kMsgTypeBinaryString:
+      case kMsgTypePing:
+      case kMsgTypePong:
+        delete mMsg.pString;
+        break;
+      case kMsgTypeStream:
+        // for now this only gets hit if msg deleted w/o being sent
+        if (mMsg.pStream) {
+          mMsg.pStream->Close();
+          mMsg.pStream->Release();
+        }
+        break;
+      case kMsgTypeFin:
+        break;    // do-nothing: avoid compiler warning
+    }
+  }
+
+  WsMsgType GetMsgType() const { return mMsgType; }
+  PRInt32 Length() const { return mLength; }
+
+  PRUint8* BeginWriting() {
+    NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream,
+                      "Stream should have been converted to string by now");
+    return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginWriting() : nsnull);
+  }
+
+  PRUint8* BeginReading() {
+    NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream,
+                      "Stream should have been converted to string by now");
+    return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginReading() : nsnull);
+  }
+
+  nsresult ConvertStreamToString()
+  {
+    NS_ABORT_IF_FALSE(mMsgType == kMsgTypeStream, "Not a stream!");
+
+#ifdef DEBUG
+    // Make sure we got correct length from Blob
+    PRUint32 bytes;
+    mMsg.pStream->Available(&bytes);
+    NS_ASSERTION(bytes == mLength, "Stream length != blob length!");
+#endif
+
+    nsAutoPtr<nsCString> temp(new nsCString());
+    nsresult rv = NS_ReadInputStreamToString(mMsg.pStream, *temp, mLength);
+
+    NS_ENSURE_SUCCESS(rv, rv);
+
+    mMsg.pStream->Close();
+    mMsg.pStream->Release();
+    mMsg.pString = temp.forget();
+    mMsgType = kMsgTypeBinaryString;
+
     return NS_OK;
   }
 
 private:
-  ~nsPostMessage() {}
-
-  nsRefPtr<WebSocketChannel>    mChannel;
-  nsCString                    *mData;
-  PRInt32                       mDataLen;
+  union {
+    nsCString      *pString;
+    nsIInputStream *pStream;
+  }                           mMsg;
+  WsMsgType                   mMsgType;
+  PRUint32                    mLength;
 };
-NS_IMPL_THREADSAFE_ISUPPORTS1(nsPostMessage, nsIRunnable)
-
+
+//-----------------------------------------------------------------------------
+// OutboundEnqueuer
+//-----------------------------------------------------------------------------
+
+class OutboundEnqueuer : public nsIRunnable
+{
+public:
+  NS_DECL_ISUPPORTS
+
+  OutboundEnqueuer(WebSocketChannel *aChannel, OutboundMessage *aMsg)
+    : mChannel(aChannel), mMessage(aMsg) {}
+
+  NS_IMETHOD Run()
+  {
+    mChannel->EnqueueOutgoingMessage(mChannel->mOutgoingMessages, mMessage);
+    return NS_OK;
+  }
+
+private:
+  ~OutboundEnqueuer() {}
+
+  nsRefPtr<WebSocketChannel>  mChannel;
+  OutboundMessage            *mMessage;
+};
+NS_IMPL_THREADSAFE_ISUPPORTS1(OutboundEnqueuer, nsIRunnable)
+
+//-----------------------------------------------------------------------------
+// nsWSAdmissionManager
+//-----------------------------------------------------------------------------
 
 // Section 5.1 requires that a client rate limit its connects to a single
 // TCP session in the CONNECTING state (i.e. anything before the 101 upgrade
 // complete response comes back and an open javascript event is created)
 
 class nsWSAdmissionManager
 {
 public:
@@ -404,20 +528,24 @@ private:
     return -1;
   }
 
   // ConnectedCount might be decremented from the main or the socket
   // thread, so manage it with atomic counters
   PRInt32 mConnectedCount;
 };
 
+//-----------------------------------------------------------------------------
+// nsWSCompression
+//
 // similar to nsDeflateConverter except for the mandatory FLUSH calls
 // required by websocket and the absence of the deflate termination
 // block which is appropriate because it would create data bytes after
 // sending the websockets CLOSE message.
+//-----------------------------------------------------------------------------
 
 class nsWSCompression
 {
 public:
   nsWSCompression(nsIStreamListener *aListener,
                   nsISupports *aContext)
     : mActive(false),
       mContext(aContext),
@@ -529,17 +657,19 @@ private:
   nsIStreamListener              *mListener;    /* weak ref */
 
   const static PRInt32            kBufferLen = 4096;
   PRUint8                         mBuffer[kBufferLen];
 };
 
 static nsWSAdmissionManager *sWebSocketAdmissions = nsnull;
 
+//-----------------------------------------------------------------------------
 // WebSocketChannel
+//-----------------------------------------------------------------------------
 
 WebSocketChannel::WebSocketChannel() :
   mCloseTimeout(20000),
   mOpenTimeout(20000),
   mPingTimeout(0),
   mPingResponseTimeout(10000),
   mMaxConcurrentConnections(200),
   mRecvdHttpOnStartRequest(0),
@@ -1048,16 +1178,19 @@ WebSocketChannel::ProcessInput(PRUint8 *
     mBuffered = 0;
   }
   return NS_OK;
 }
 
 void
 WebSocketChannel::ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len)
 {
+  if (!data || len == 0)
+    return;
+
   // Optimally we want to apply the mask 32 bits at a time,
   // but the buffer might not be alligned. So we first deal with
   // 0 to 3 bytes of preamble individually
 
   while (len && (reinterpret_cast<PRUptrdiff>(data) & 3)) {
     *data ^= mask >> 24;
     mask = PR_ROTATE_LEFT32(mask, 8);
     data++;
@@ -1084,59 +1217,53 @@ WebSocketChannel::ApplyMask(PRUint32 mas
     data++;
     len--;
   }
 }
 
 void
 WebSocketChannel::GeneratePing()
 {
-  LOG(("WebSocketChannel::GeneratePing() %p\n", this));
-
   nsCString *buf = new nsCString();
   buf->Assign("PING");
-  mOutgoingPingMessages.Push(new OutboundMessage(buf));
-  OnOutputStreamReady(mSocketOut);
+  EnqueueOutgoingMessage(mOutgoingPingMessages,
+                         new OutboundMessage(kMsgTypePing, buf));
 }
 
 void
 WebSocketChannel::GeneratePong(PRUint8 *payload, PRUint32 len)
 {
-  LOG(("WebSocketChannel::GeneratePong() %p [%p %u]\n", this, payload, len));
-
   nsCString *buf = new nsCString();
   buf->SetLength(len);
   if (buf->Length() < len) {
     LOG(("WebSocketChannel::GeneratePong Allocation Failure\n"));
     delete buf;
     return;
   }
 
   memcpy(buf->BeginWriting(), payload, len);
-  mOutgoingPongMessages.Push(new OutboundMessage(buf));
-  OnOutputStreamReady(mSocketOut);
+  EnqueueOutgoingMessage(mOutgoingPongMessages,
+                         new OutboundMessage(kMsgTypePong, buf));
 }
 
 void
-WebSocketChannel::SendMsgInternal(nsCString *aMsg,
-                                    PRInt32 aDataLen)
+WebSocketChannel::EnqueueOutgoingMessage(nsDeque &aQueue,
+                                         OutboundMessage *aMsg)
 {
-  LOG(("WebSocketChannel::SendMsgInternal %p [%p len=%d]\n", this, aMsg,
-       aDataLen));
   NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread");
-  if (aMsg == kFinMessage) {
-    mOutgoingMessages.Push(new OutboundMessage());
-  } else if (aDataLen < 0) {
-    mOutgoingMessages.Push(new OutboundMessage(aMsg));
-  } else {
-    mOutgoingMessages.Push(new OutboundMessage(aMsg, aDataLen));
-  }
+
+  LOG(("WebSocketChannel::EnqueueOutgoingMessage %p "
+       "queueing msg %p [type=%s len=%d]\n",
+       this, aMsg, msgNames[aMsg->GetMsgType()], aMsg->Length()));
+
+  aQueue.Push(aMsg);
   OnOutputStreamReady(mSocketOut);
 }
 
+
 PRUint16
 WebSocketChannel::ResultToCloseCode(nsresult resultCode)
 {
   if (NS_SUCCEEDED(resultCode))
     return CLOSE_NORMAL;
   if (resultCode == NS_ERROR_FILE_TOO_BIG)
     return CLOSE_TOO_LARGE;
   if (resultCode == NS_BASE_STREAM_CLOSED ||
@@ -1150,44 +1277,52 @@ WebSocketChannel::ResultToCloseCode(nsre
 
 void
 WebSocketChannel::PrimeNewOutgoingMessage()
 {
   LOG(("WebSocketChannel::PrimeNewOutgoingMessage() %p\n", this));
   NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread");
   NS_ABORT_IF_FALSE(!mCurrentOut, "Current message in progress");
 
-  bool isPong = false;
-  bool isPing = false;
+  nsresult rv = NS_OK;
 
   mCurrentOut = (OutboundMessage *)mOutgoingPongMessages.PopFront();
   if (mCurrentOut) {
-    isPong = true;
+    NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePong,
+                     "Not pong message!");
   } else {
     mCurrentOut = (OutboundMessage *)mOutgoingPingMessages.PopFront();
     if (mCurrentOut)
-      isPing = true;
+      NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePing,
+                        "Not ping message!");
     else
       mCurrentOut = (OutboundMessage *)mOutgoingMessages.PopFront();
   }
 
   if (!mCurrentOut)
     return;
+
+  WsMsgType msgType = mCurrentOut->GetMsgType();
+
+  LOG(("WebSocketChannel::PrimeNewOutgoingMessage "
+       "%p found queued msg %p [type=%s len=%d]\n",
+       this, mCurrentOut, msgNames[msgType], mCurrentOut->Length()));
+
   mCurrentOutSent = 0;
   mHdrOut = mOutHeader;
 
   PRUint8 *payload = nsnull;
-  if (mCurrentOut->IsControl() && !isPing && !isPong) {
+
+  if (msgType == kMsgTypeFin) {
     // This is a demand to create a close message
     if (mClientClosed) {
       PrimeNewOutgoingMessage();
       return;
     }
 
-    LOG(("WebSocketChannel:: PrimeNewOutgoingMessage() found close request\n"));
     mClientClosed = 1;
     mOutHeader[0] = kFinalFragBit | kClose;
     mOutHeader[1] = 0x02; // payload len = 2, maybe more for reason
     mOutHeader[1] |= kMaskBit;
 
     // payload is offset 6 including 4 for the mask
     payload = mOutHeader + 6;
 
@@ -1215,40 +1350,55 @@ WebSocketChannel::PrimeNewOutgoingMessag
     if (mServerClosed) {
       /* bidi close complete */
       mReleaseOnTransmit = 1;
     } else if (NS_FAILED(mStopOnClose)) {
       /* result of abort session - give up */
       StopSession(mStopOnClose);
     } else {
       /* wait for reciprocal close from server */
-      nsresult rv;
       mCloseTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
       if (NS_SUCCEEDED(rv)) {
         mCloseTimer->InitWithCallback(this, mCloseTimeout,
                                       nsITimer::TYPE_ONE_SHOT);
       } else {
         StopSession(rv);
       }
     }
   } else {
-    if (isPong) {
-      LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found pong request\n"));
+    switch (msgType) {
+    case kMsgTypePong:
       mOutHeader[0] = kFinalFragBit | kPong;
-    } else if (isPing) {
-      LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found ping request\n"));
+      break;
+    case kMsgTypePing:
       mOutHeader[0] = kFinalFragBit | kPing;
-    } else if (mCurrentOut->BinaryLen() < 0) {
-      LOG(("WebSocketChannel::PrimeNewOutgoingMessage() "
-           "found queued text message len %d\n", mCurrentOut->Length()));
+      break;
+    case kMsgTypeString:
       mOutHeader[0] = kFinalFragBit | kText;
-    } else {
-      LOG(("WebSocketChannel::PrimeNewOutgoingMessage() "
-           "found queued binary message len %d\n", mCurrentOut->Length()));
+      break;
+    case kMsgTypeStream:
+      // HACK ALERT:  read in entire stream into string.
+      // Will block socket transport thread if file is blocking.
+      // TODO: bug 704447:  don't block socket thread!
+      rv = mCurrentOut->ConvertStreamToString();
+      if (NS_FAILED(rv)) {
+        AbortSession(rv);
+        return;
+      }
+      // Now we're a binary string
+      msgType = kMsgTypeBinaryString;
+
+      // no break: fall down into binary string case
+
+    case kMsgTypeBinaryString:
       mOutHeader[0] = kFinalFragBit | kBinary;
+      break;
+    case kMsgTypeFin:
+      NS_ABORT_IF_FALSE(false, "unreachable");  // avoid compiler warning
+      break;
     }
 
     if (mCurrentOut->Length() < 126) {
       mOutHeader[1] = mCurrentOut->Length() | kMaskBit;
       mHdrOutToSend = 6;
     } else if (mCurrentOut->Length() <= 0xffff) {
       mOutHeader[1] = 126 | kMaskBit;
       ((PRUint16 *)mOutHeader)[1] =
@@ -1261,17 +1411,17 @@ WebSocketChannel::PrimeNewOutgoingMessag
       memcpy(mOutHeader + 2, &tempLen, 8);
       mHdrOutToSend = 14;
     }
     payload = mOutHeader + mHdrOutToSend;
   }
 
   NS_ABORT_IF_FALSE(payload, "payload offset not found");
 
-  // Perfom the sending mask. never use a zero mask
+  // Perform the sending mask. Never use a zero mask
   PRUint32 mask;
   do {
     PRUint8 *buffer;
     nsresult rv = mRandomGenerator->GenerateRandomBytes(4, &buffer);
     if (NS_FAILED(rv)) {
       LOG(("WebSocketChannel::PrimeNewOutgoingMessage(): "
            "GenerateRandomBytes failure %x\n", rv));
       StopSession(rv);
@@ -1503,18 +1653,19 @@ WebSocketChannel::AbortSession(nsresult 
 
   if (mStopped)
     return;
   mStopped = 1;
 
   if (mTransport && reason != NS_BASE_STREAM_CLOSED &&
       !mRequestedClose && !mClientClosed && !mServerClosed) {
     mRequestedClose = 1;
-    mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1),
-                            nsIEventTarget::DISPATCH_NORMAL);
+    mSocketThread->Dispatch(
+      new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)),
+                           nsIEventTarget::DISPATCH_NORMAL);
     mStopOnClose = reason;
   } else {
     StopSession(reason);
   }
 }
 
 // ReleaseSession is called on orderly shutdown
 void
@@ -2115,67 +2266,74 @@ WebSocketChannel::Close(PRUint16 code, c
 
   // The API requires the UTF-8 string to be 123 or less bytes
   if (reason.Length() > 123)
     return NS_ERROR_ILLEGAL_VALUE;
 
   mRequestedClose = 1;
   mScriptCloseReason = reason;
   mScriptCloseCode = code;
-    
-  return mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1),
-                                 nsIEventTarget::DISPATCH_NORMAL);
+
+  return mSocketThread->Dispatch(
+      new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)),
+                           nsIEventTarget::DISPATCH_NORMAL);
 }
 
 NS_IMETHODIMP
 WebSocketChannel::SendMsg(const nsACString &aMsg)
 {
   LOG(("WebSocketChannel::SendMsg() %p\n", this));
-  NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
-
-  if (mRequestedClose) {
-    LOG(("WebSocketChannel:: SendMsg when closed error\n"));
-    return NS_ERROR_UNEXPECTED;
-  }
-
-  if (mStopped) {
-    LOG(("WebSocketChannel:: SendMsg when stopped error\n"));
-    return NS_ERROR_NOT_CONNECTED;
-  }
-
-  return mSocketThread->Dispatch(
-                          new nsPostMessage(this, new nsCString(aMsg), -1),
-                          nsIEventTarget::DISPATCH_NORMAL);
+
+  return SendMsgCommon(&aMsg, false, aMsg.Length());
 }
 
 NS_IMETHODIMP
 WebSocketChannel::SendBinaryMsg(const nsACString &aMsg)
 {
   LOG(("WebSocketChannel::SendBinaryMsg() %p len=%d\n", this, aMsg.Length()));
+  return SendMsgCommon(&aMsg, true, aMsg.Length());
+}
+
+NS_IMETHODIMP
+WebSocketChannel::SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength)
+{
+  LOG(("WebSocketChannel::SendBinaryStream() %p\n", this));
+
+  return SendMsgCommon(nsnull, true, aLength, aStream);
+}
+
+nsresult
+WebSocketChannel::SendMsgCommon(const nsACString *aMsg, bool aIsBinary,
+                                PRUint32 aLength, nsIInputStream *aStream)
+{
   NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
 
   if (mRequestedClose) {
-    LOG(("WebSocketChannel:: SendBinaryMsg when closed error\n"));
+    LOG(("WebSocketChannel:: Error: send when closed\n"));
     return NS_ERROR_UNEXPECTED;
   }
 
   if (mStopped) {
-    LOG(("WebSocketChannel:: SendBinaryMsg when stopped error\n"));
+    LOG(("WebSocketChannel:: Error: send when stopped\n"));
     return NS_ERROR_NOT_CONNECTED;
   }
 
-  return mSocketThread->Dispatch(new nsPostMessage(this, new nsCString(aMsg), 
-                                                   aMsg.Length()),
-                                 nsIEventTarget::DISPATCH_NORMAL);
+  return mSocketThread->Dispatch(
+    aStream ? new OutboundEnqueuer(this, new OutboundMessage(aStream, aLength))
+            : new OutboundEnqueuer(this,
+                     new OutboundMessage(aIsBinary ? kMsgTypeBinaryString
+                                                   : kMsgTypeString,
+                                         new nsCString(*aMsg))),
+    nsIEventTarget::DISPATCH_NORMAL);
 }
 
 NS_IMETHODIMP
 WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport,
-                                         nsIAsyncInputStream *aSocketIn,
-                                         nsIAsyncOutputStream *aSocketOut)
+                                       nsIAsyncInputStream *aSocketIn,
+                                       nsIAsyncOutputStream *aSocketOut)
 {
   LOG(("WebSocketChannel::OnTransportAvailable %p [%p %p %p] rcvdonstart=%d\n",
        this, aTransport, aSocketIn, aSocketOut, mRecvdHttpOnStartRequest));
 
   NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
   NS_ABORT_IF_FALSE(!mRecvdHttpUpgradeTransport, "OTA duplicated");
   NS_ABORT_IF_FALSE(aSocketIn, "OTA with invalid socketIn");
 
@@ -2196,17 +2354,17 @@ WebSocketChannel::OnTransportAvailable(n
     return StartWebsocketData();
   return NS_OK;
 }
 
 // nsIRequestObserver (from nsIStreamListener)
 
 NS_IMETHODIMP
 WebSocketChannel::OnStartRequest(nsIRequest *aRequest,
-                                   nsISupports *aContext)
+                                 nsISupports *aContext)
 {
   LOG(("WebSocketChannel::OnStartRequest(): %p [%p %p] recvdhttpupgrade=%d\n",
        this, aRequest, aContext, mRecvdHttpUpgradeTransport));
   NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
   NS_ABORT_IF_FALSE(!mRecvdHttpOnStartRequest, "OTA duplicated");
 
   // Generating the onStart event will take us out of the
   // CONNECTING state which means we can now open another,
--- a/netwerk/protocol/websocket/WebSocketChannel.h
+++ b/netwerk/protocol/websocket/WebSocketChannel.h
@@ -62,17 +62,18 @@
 #include "BaseWebSocketChannel.h"
 
 #include "nsCOMPtr.h"
 #include "nsString.h"
 #include "nsDeque.h"
 
 namespace mozilla { namespace net {
 
-class nsPostMessage;
+class OutboundMessage;
+class OutboundEnqueuer;
 class nsWSAdmissionManager;
 class nsWSCompression;
 class CallOnMessageAvailable;
 class CallOnStop;
 class CallOnServerClose;
 class CallAcknowledge;
 
 class WebSocketChannel : public BaseWebSocketChannel,
@@ -101,16 +102,17 @@ public:
   //
   NS_IMETHOD AsyncOpen(nsIURI *aURI,
                        const nsACString &aOrigin,
                        nsIWebSocketListener *aListener,
                        nsISupports *aContext);
   NS_IMETHOD Close(PRUint16 aCode, const nsACString & aReason);
   NS_IMETHOD SendMsg(const nsACString &aMsg);
   NS_IMETHOD SendBinaryMsg(const nsACString &aMsg);
+  NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 length);
   NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo);
 
   WebSocketChannel();
   static void Shutdown();
 
   enum {
     // Non Control Frames
     kContinuation = 0x0,
@@ -126,24 +128,29 @@ public:
   const static PRUint32 kControlFrameMask   = 0x8;
   const static PRUint8 kMaskBit             = 0x80;
   const static PRUint8 kFinalFragBit        = 0x80;
 
 protected:
   virtual ~WebSocketChannel();
 
 private:
-  friend class nsPostMessage;
+  friend class OutboundEnqueuer;
   friend class nsWSAdmissionManager;
   friend class CallOnMessageAvailable;
   friend class CallOnStop;
   friend class CallOnServerClose;
   friend class CallAcknowledge;
 
-  void SendMsgInternal(nsCString *aMsg, PRInt32 datalen);
+  // Common send code for binary + text msgs
+  nsresult SendMsgCommon(const nsACString *aMsg, bool isBinary,
+                         PRUint32 length, nsIInputStream *aStream = NULL);
+
+  void EnqueueOutgoingMessage(nsDeque &aQueue, OutboundMessage *aMsg);
+
   void PrimeNewOutgoingMessage();
   void GeneratePong(PRUint8 *payload, PRUint32 len);
   void GeneratePing();
 
   nsresult BeginOpen();
   nsresult HandleExtensions();
   nsresult SetupRequest();
   nsresult ApplyForAdmission();
@@ -158,58 +165,16 @@ private:
   void EnsureHdrOut(PRUint32 size);
   void ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len);
 
   bool     IsPersistentFramePtr();
   nsresult ProcessInput(PRUint8 *buffer, PRUint32 count);
   PRUint32 UpdateReadBuffer(PRUint8 *buffer, PRUint32 count,
                             PRUint32 accumulatedFragments);
 
-  class OutboundMessage
-  {
-  public:
-    OutboundMessage (nsCString *str)
-      : mMsg(str), mIsControl(false), mBinaryLen(-1)
-    { MOZ_COUNT_CTOR(WebSocketOutboundMessage); }
-
-    OutboundMessage (nsCString *str, PRInt32 dataLen)
-      : mMsg(str), mIsControl(false), mBinaryLen(dataLen)
-    { MOZ_COUNT_CTOR(WebSocketOutboundMessage); }
-
-    OutboundMessage ()
-      : mMsg(nsnull), mIsControl(true), mBinaryLen(-1)
-    { MOZ_COUNT_CTOR(WebSocketOutboundMessage); }
-
-    ~OutboundMessage()
-    {
-      MOZ_COUNT_DTOR(WebSocketOutboundMessage);
-      delete mMsg;
-    }
-
-    bool IsControl()  { return mIsControl; }
-    const nsCString *Msg()  { return mMsg; }
-    PRInt32 BinaryLen() { return mBinaryLen; }
-    PRInt32 Length()
-    {
-      if (mBinaryLen >= 0)
-        return mBinaryLen;
-      return mMsg ? mMsg->Length() : 0;
-    }
-    PRUint8 *BeginWriting() {
-      return (PRUint8 *)(mMsg ? mMsg->BeginWriting() : nsnull);
-    }
-    PRUint8 *BeginReading() {
-      return (PRUint8 *)(mMsg ? mMsg->BeginReading() : nsnull);
-    }
-
-  private:
-    nsCString *mMsg;
-    bool       mIsControl;
-    PRInt32    mBinaryLen;
-  };
 
   nsCOMPtr<nsIEventTarget>                 mSocketThread;
   nsCOMPtr<nsIHttpChannelInternal>         mChannel;
   nsCOMPtr<nsIHttpChannel>                 mHttpChannel;
   nsCOMPtr<nsICancelable>                  mDNSRequest;
   nsCOMPtr<nsIAsyncVerifyRedirectCallback> mRedirectCallback;
   nsCOMPtr<nsIRandomGenerator>             mRandomGenerator;
 
@@ -229,17 +194,17 @@ private:
   nsCOMPtr<nsITimer>              mPingTimer;
   PRUint32                        mPingTimeout;  /* milliseconds */
   PRUint32                        mPingResponseTimeout;  /* milliseconds */
 
   nsCOMPtr<nsITimer>              mLingeringCloseTimer;
   const static PRInt32            kLingeringCloseTimeout =   1000;
   const static PRInt32            kLingeringCloseThreshold = 50;
 
-  PRUint32                        mMaxConcurrentConnections;
+  PRInt32                         mMaxConcurrentConnections;
 
   PRUint32                        mRecvdHttpOnStartRequest   : 1;
   PRUint32                        mRecvdHttpUpgradeTransport : 1;
   PRUint32                        mRequestedClose            : 1;
   PRUint32                        mClientClosed              : 1;
   PRUint32                        mServerClosed              : 1;
   PRUint32                        mStopped                   : 1;
   PRUint32                        mCalledOnStop              : 1;
--- a/netwerk/protocol/websocket/WebSocketChannelChild.cpp
+++ b/netwerk/protocol/websocket/WebSocketChannelChild.cpp
@@ -401,16 +401,27 @@ WebSocketChannelChild::SendBinaryMsg(con
   LOG(("WebSocketChannelChild::SendBinaryMsg() %p\n", this));
 
   if (!mIPCOpen || !SendSendBinaryMsg(nsCString(aMsg)))
     return NS_ERROR_UNEXPECTED;
   return NS_OK;
 }
 
 NS_IMETHODIMP
+WebSocketChannelChild::SendBinaryStream(nsIInputStream *aStream,
+                                        PRUint32 aLength)
+{
+  LOG(("WebSocketChannelChild::SendBinaryStream() %p\n", this));
+
+  if (!mIPCOpen || !SendSendBinaryStream(IPC::InputStream(aStream), aLength))
+    return NS_ERROR_UNEXPECTED;
+  return NS_OK;
+}
+
+NS_IMETHODIMP
 WebSocketChannelChild::GetSecurityInfo(nsISupports **aSecurityInfo)
 {
   LOG(("WebSocketChannelChild::GetSecurityInfo() %p\n", this));
   return NS_ERROR_NOT_AVAILABLE;
 }
 
 } // namespace net
 } // namespace mozilla
--- a/netwerk/protocol/websocket/WebSocketChannelChild.h
+++ b/netwerk/protocol/websocket/WebSocketChannelChild.h
@@ -55,24 +55,23 @@ class WebSocketChannelChild : public Bas
  public:
   WebSocketChannelChild(bool aSecure);
   ~WebSocketChannelChild();
 
   NS_DECL_ISUPPORTS
 
   // nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us
   //
-  NS_SCRIPTABLE NS_IMETHOD AsyncOpen(nsIURI *aURI,
-                                     const nsACString &aOrigin,
-                                     nsIWebSocketListener *aListener,
-                                     nsISupports *aContext);
-  NS_SCRIPTABLE NS_IMETHOD Close(PRUint16 code, const nsACString & reason);
-  NS_SCRIPTABLE NS_IMETHOD SendMsg(const nsACString &aMsg);
-  NS_SCRIPTABLE NS_IMETHOD SendBinaryMsg(const nsACString &aMsg);
-  NS_SCRIPTABLE NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo);
+  NS_IMETHOD AsyncOpen(nsIURI *aURI, const nsACString &aOrigin,
+                       nsIWebSocketListener *aListener, nsISupports *aContext);
+  NS_IMETHOD Close(PRUint16 code, const nsACString & reason);
+  NS_IMETHOD SendMsg(const nsACString &aMsg);
+  NS_IMETHOD SendBinaryMsg(const nsACString &aMsg);
+  NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength);
+  NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo);
 
   void AddIPDLReference();
   void ReleaseIPDLReference();
 
  private:
   bool RecvOnStart(const nsCString& aProtocol, const nsCString& aExtensions);
   bool RecvOnStop(const nsresult& aStatusCode);
   bool RecvOnMessageAvailable(const nsCString& aMsg);
--- a/netwerk/protocol/websocket/WebSocketChannelParent.cpp
+++ b/netwerk/protocol/websocket/WebSocketChannelParent.cpp
@@ -132,16 +132,28 @@ WebSocketChannelParent::RecvSendBinaryMs
   LOG(("WebSocketChannelParent::RecvSendBinaryMsg() %p\n", this));
   if (mChannel) {
     nsresult rv = mChannel->SendBinaryMsg(aMsg);
     NS_ENSURE_SUCCESS(rv, true);
   }
   return true;
 }
 
+bool
+WebSocketChannelParent::RecvSendBinaryStream(const InputStream& aStream,
+                                             const PRUint32& aLength)
+{
+  LOG(("WebSocketChannelParent::RecvSendBinaryStream() %p\n", this));
+  if (mChannel) {
+    nsresult rv = mChannel->SendBinaryStream(aStream, aLength);
+    NS_ENSURE_SUCCESS(rv, true);
+  }
+  return true;
+}
+
 NS_IMETHODIMP
 WebSocketChannelParent::GetInterface(const nsIID & iid, void **result NS_OUTPARAM)
 {
   LOG(("WebSocketChannelParent::GetInterface() %p\n", this));
   if (mAuthProvider && iid.Equals(NS_GET_IID(nsIAuthPromptProvider)))
     return mAuthProvider->GetAuthPrompt(nsIAuthPromptProvider::PROMPT_NORMAL,
                                         iid, result);
 
--- a/netwerk/protocol/websocket/WebSocketChannelParent.h
+++ b/netwerk/protocol/websocket/WebSocketChannelParent.h
@@ -65,16 +65,18 @@ class WebSocketChannelParent : public PW
  private:
   bool RecvAsyncOpen(const IPC::URI& aURI,
                      const nsCString& aOrigin,
                      const nsCString& aProtocol,
                      const bool& aSecure);
   bool RecvClose(const PRUint16 & code, const nsCString & reason);
   bool RecvSendMsg(const nsCString& aMsg);
   bool RecvSendBinaryMsg(const nsCString& aMsg);
+  bool RecvSendBinaryStream(const InputStream& aStream,
+                            const PRUint32& aLength);
   bool RecvDeleteSelf();
 
   void ActorDestroy(ActorDestroyReason why);
 
   nsCOMPtr<nsIAuthPromptProvider> mAuthProvider;
   nsCOMPtr<nsIWebSocketChannel> mChannel;
   bool mIPCOpen;
 };
--- a/netwerk/protocol/websocket/nsIWebSocketChannel.idl
+++ b/netwerk/protocol/websocket/nsIWebSocketChannel.idl
@@ -36,20 +36,24 @@
  * the terms of any one of the MPL, the GPL or the LGPL.
  *
  * ***** END LICENSE BLOCK ***** */
 
 interface nsIURI;
 interface nsIInterfaceRequestor;
 interface nsILoadGroup;
 interface nsIWebSocketListener;
+interface nsIInputStream;
 
 #include "nsISupports.idl"
 
-[scriptable, uuid(e8ae0371-c28f-4d61-b257-514e014a4686)]
+/** 
+ *  You probably want nsI{Moz}WebSocket.idl
+ */
+[uuid(bb69e5d7-d9cd-4aab-9abe-98f80cf8b8b8)]
 interface nsIWebSocketChannel : nsISupports
 {
     /**
      * The original URI used to construct the protocol connection. This is used
      * in the case of a redirect or URI "resolution" (e.g. resolving a
      * resource: URI to a file: URI) so that the original pre-redirect
      * URI can still be obtained.  This is never null.
      */
@@ -135,9 +139,17 @@ interface nsIWebSocketChannel : nsISuppo
     void sendMsg(in AUTF8String aMsg);
 
     /**
      * Use to send binary message down the connection to WebSocket peer.
      *
      * @param aMsg the data to send
      */
     void sendBinaryMsg(in ACString aMsg);
+
+    /** 
+     * Use to send a binary stream (Blob) to Websocket peer.
+     *
+     * @param aStream The input stream to be sent.  
+     */
+    void sendBinaryStream(in nsIInputStream aStream, 
+                          in unsigned long length);
 };