bug 756551 spdy stream index integrity checks r=honzab a=akebyl
authorPatrick McManus <mcmanus@ducksong.com>
Mon, 21 May 2012 17:09:25 -0400
changeset 95830 eeb44575b30cc296966ae2b889f807537cbab59b
parent 95829 af9be77989b64376c187a3eae6d871df9264a10f
child 95831 65b510f3440fd9f80955e656d38a5349011c48f4
push id886
push userlsblakk@mozilla.com
push dateMon, 04 Jun 2012 19:57:52 +0000
treeherdermozilla-beta@bbd8d5efd6d1 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewershonzab, akebyl
bugs756551
milestone14.0a2
bug 756551 spdy stream index integrity checks r=honzab a=akebyl
netwerk/protocol/http/SpdySession.cpp
netwerk/protocol/http/SpdySession.h
netwerk/protocol/http/SpdyStream.cpp
--- a/netwerk/protocol/http/SpdySession.cpp
+++ b/netwerk/protocol/http/SpdySession.cpp
@@ -346,28 +346,43 @@ SpdySession::RegisterStreamID(SpdyStream
   mNextStreamID += 2;
 
   // We've used up plenty of ID's on this session. Start
   // moving to a new one before there is a crunch involving
   // server push streams or concurrent non-registered submits
   if (mNextStreamID >= kMaxStreamID)
     mShouldGoAway = true;
 
+  // integrity check
+  if (mStreamIDHash.Get(result)) {
+    LOG3(("   New ID already present\n"));
+    NS_ABORT_IF_FALSE(false, "New ID already present in mStreamIDHash");
+    mShouldGoAway = true;
+    return kDeadStreamID;
+  }
+
   mStreamIDHash.Put(result, stream);
   return result;
 }
 
 bool
 SpdySession::AddStream(nsAHttpTransaction *aHttpTransaction,
                        PRInt32 aPriority)
 {
   NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "wrong thread");
   NS_ABORT_IF_FALSE(!mStreamTransactionHash.Get(aHttpTransaction),
                     "AddStream duplicate transaction pointer");
 
+  // integrity check
+  if (mStreamTransactionHash.Get(aHttpTransaction)) {
+    LOG3(("   New transaction already present\n"));
+    NS_ABORT_IF_FALSE(false, "New transaction already present in hash");
+    return false;
+  }
+
   aHttpTransaction->SetConnection(this);
   SpdyStream *stream = new SpdyStream(aHttpTransaction,
                                       this,
                                       mSocketTransport,
                                       mSendingChunkSize,
                                       &mUpstreamZlib,
                                       aPriority);
 
@@ -846,24 +861,82 @@ SpdySession::GenerateGoAway()
   packet[7] = 4;                                  /* data length */
   
   // last-good-stream-id are bytes 8-11, when we accept server push this will
   // need to be set non zero
 
   FlushOutputQueue();
 }
 
+// perform a bunch of integrity checks on the stream.
+// returns true if passed, false (plus LOG and ABORT) if failed.
+bool
+SpdySession::VerifyStream(SpdyStream *aStream, PRUint32 aOptionalID = 0)
+{
+  // This is annoying, but at least it is O(1)
+  NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "wrong thread");
+
+  if (!aStream)
+    return true;
+
+  PRUint32 test = 0;
+  
+  do {
+    if (aStream->StreamID() == kDeadStreamID)
+      break;
+
+    nsAHttpTransaction *trans = aStream->Transaction();
+
+    test++;  
+    if (!trans)
+      break;
+
+    test++;
+    if (mStreamTransactionHash.Get(trans) != aStream)
+      break;
+    
+    if (aStream->StreamID()) {
+      SpdyStream *idStream = mStreamIDHash.Get(aStream->StreamID());
+
+      test++;
+      if (idStream != aStream)
+        break;
+
+      if (aOptionalID) {
+        test++;
+        if (idStream->StreamID() != aOptionalID)
+          break;
+      }
+    }
+
+    // tests passed
+    return true;
+  } while (0);
+
+  LOG(("SpdySession %p VerifyStream Failure %p stream->id=0x%x "
+       "optionalID=0x%x trans=%p test=%d\n",
+       this, aStream, aStream->StreamID(),
+       aOptionalID, aStream->Transaction(), test));
+  NS_ABORT_IF_FALSE(false, "VerifyStream");
+  return false;
+}
+
 void
 SpdySession::CleanupStream(SpdyStream *aStream, nsresult aResult,
                            rstReason aResetCode)
 {
   NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "wrong thread");
   LOG3(("SpdySession::CleanupStream %p %p 0x%x %X\n",
         this, aStream, aStream->StreamID(), aResult));
 
+  if (!VerifyStream(aStream)) {
+    LOG(("SpdySession::CleanupStream failed to verify stream\n"));
+    return;
+  }
+
   if (!aStream->RecvdFin() && aStream->StreamID()) {
     LOG3(("Stream had not processed recv FIN, sending RST code %X\n",
           aResetCode));
     GenerateRstStream(aResetCode, aStream->StreamID());
     --mConcurrent;
     ProcessPending();
   }
   
@@ -962,16 +1035,29 @@ SpdySession::HandleSynStream(SpdySession
 
   // todo populate cache. For now, just reject server push p3
   self->GenerateRstStream(RST_REFUSED_STREAM, streamID);
   self->ResetDownstreamState();
   return NS_OK;
 }
 
 nsresult
+SpdySession::SetInputFrameDataStream(PRUint32 streamID)
+{
+  mInputFrameDataStream = mStreamIDHash.Get(streamID);
+  if (VerifyStream(mInputFrameDataStream, streamID))
+    return NS_OK;
+
+  LOG(("SpdySession::SetInputFrameDataStream failed to verify 0x%X\n",
+       streamID));
+  mInputFrameDataStream = nsnull;
+  return NS_ERROR_UNEXPECTED;
+}
+
+nsresult
 SpdySession::HandleSynReply(SpdySession *self)
 {
   NS_ABORT_IF_FALSE(self->mFrameControlType == CONTROL_TYPE_SYN_REPLY,
                     "wrong control type");
 
   if (self->mInputFrameDataSize < 8) {
     LOG3(("SpdySession::HandleSynReply %p SYN REPLY too short data=%d",
           self, self->mInputFrameDataSize));
@@ -985,31 +1071,36 @@ SpdySession::HandleSynReply(SpdySession 
   // the session becuase the session compression context will become
   // inconsistent if all of the compressed data is not processed.
   if (NS_FAILED(self->DownstreamUncompress(self->mInputFrameBuffer + 14,
                                            self->mInputFrameDataSize - 6))) {
     LOG(("SpdySession::HandleSynReply uncompress failed\n"));
     return NS_ERROR_FAILURE;
   }
 
+  LOG3(("SpdySession::HandleSynReply %p lookup via streamID in syn_reply.\n",
+        self));
   PRUint32 streamID =
     PR_ntohl(reinterpret_cast<PRUint32 *>(self->mInputFrameBuffer.get())[2]);
-  self->mInputFrameDataStream = self->mStreamIDHash.Get(streamID);
+  nsresult rv = self->SetInputFrameDataStream(streamID);
+  if (NS_FAILED(rv))
+    return rv;
+
   if (!self->mInputFrameDataStream) {
     LOG3(("SpdySession::HandleSynReply %p lookup streamID in syn_reply "
           "0x%X failed. NextStreamID = 0x%x", self, streamID,
           self->mNextStreamID));
     if (streamID >= self->mNextStreamID)
       self->GenerateRstStream(RST_INVALID_STREAM, streamID);
 
     self->ResetDownstreamState();
     return NS_OK;
   }
 
-  nsresult rv = self->HandleSynReplyForValidStream();
+  rv = self->HandleSynReplyForValidStream();
   if (rv == NS_ERROR_ILLEGAL_VALUE) {
     LOG3(("SpdySession::HandleSynReply %p PROTOCOL_ERROR detected 0x%X\n",
           self, streamID));
     self->CleanupStream(self->mInputFrameDataStream, rv, RST_PROTOCOL_ERROR);
     self->ResetDownstreamState();
     rv = NS_OK;
   }
 
@@ -1109,20 +1200,27 @@ SpdySession::HandleRstStream(SpdySession
   
   if (self->mDownstreamRstReason == RST_INVALID_STREAM ||
       self->mDownstreamRstReason == RST_FLOW_CONTROL_ERROR) {
     // basically just ignore this
     self->ResetDownstreamState();
     return NS_OK;
   }
 
-  self->mInputFrameDataStream = self->mStreamIDHash.Get(streamID);
+  nsresult rv = self->SetInputFrameDataStream(streamID);
+  
   if (!self->mInputFrameDataStream) {
+    if (NS_FAILED(rv))
+      LOG(("SpdySession::HandleRstStream %p lookup streamID for RST Frame "
+           "0x%X failed reason = %d :: VerifyStream Failed\n", self, streamID,
+           self->mDownstreamRstReason));
+
     LOG3(("SpdySession::HandleRstStream %p lookup streamID for RST Frame "
-          "0x%X failed", self, streamID));
+          "0x%X failed reason = %d", self, streamID,
+          self->mDownstreamRstReason));
     return NS_ERROR_ILLEGAL_VALUE;
   }
 
   self->ChangeDownstreamState(PROCESSING_CONTROL_RST_STREAM);
   return NS_OK;
 }
 
 nsresult
@@ -1574,17 +1672,22 @@ SpdySession::WriteSegments(nsAHttpSegmen
         return NS_ERROR_ILLEGAL_VALUE;
       }
     }
     else {
       ChangeDownstreamState(PROCESSING_DATA_FRAME);
 
       PRUint32 streamID =
         PR_ntohl(reinterpret_cast<PRUint32 *>(mInputFrameBuffer.get())[0]);
-      mInputFrameDataStream = mStreamIDHash.Get(streamID);
+      rv = SetInputFrameDataStream(streamID);
+      if (NS_FAILED(rv)) {
+        LOG(("SpdySession::WriteSegments %p lookup streamID 0x%X failed. "
+              "probably due to verification.\n", this, streamID));
+        return rv;
+      }
       if (!mInputFrameDataStream) {
         LOG3(("SpdySession::WriteSegments %p lookup streamID 0x%X failed. "
               "Next = 0x%x", this, streamID, mNextStreamID));
         if (streamID >= mNextStreamID)
           GenerateRstStream(RST_INVALID_STREAM, streamID);
         ChangeDownstreamState(DISCARDING_DATA_FRAME);
       }
       mInputFrameDataLast = (mInputFrameBuffer[4] & kFlag_Data_FIN);
@@ -1616,16 +1719,19 @@ SpdySession::WriteSegments(nsAHttpSegmen
 
     if (mDownstreamRstReason != RST_REFUSED_STREAM &&
         mDownstreamRstReason != RST_CANCEL)
       mShouldGoAway = true;
 
     // mInputFrameDataStream is reset by ChangeDownstreamState
     SpdyStream *stream = mInputFrameDataStream;
     ResetDownstreamState();
+    LOG3(("SpdySession::WriteSegments cleanup stream on recv of rst "
+          "session=%p stream=%p 0x%X\n", this, stream,
+          stream ? stream->StreamID() : 0));
     CleanupStream(stream, rv, RST_CANCEL);
     return NS_OK;
   }
 
   if (mDownstreamState == PROCESSING_DATA_FRAME ||
       mDownstreamState == PROCESSING_CONTROL_SYN_REPLY) {
 
     mSegmentWriter = writer;
@@ -1635,22 +1741,30 @@ SpdySession::WriteSegments(nsAHttpSegmen
     mLastDataReadEpoch = mLastReadEpoch;
 
     if (rv == NS_BASE_STREAM_CLOSED) {
       // This will happen when the transaction figures out it is EOF, generally
       // due to a content-length match being made
       SpdyStream *stream = mInputFrameDataStream;
       if (mInputFrameDataRead == mInputFrameDataSize)
         ResetDownstreamState();
+      LOG3(("SpdySession::WriteSegments session=%p stream=%p 0x%X "
+            "needscleanup=%p. cleanup stream based on "
+            "stream->writeSegments returning BASE_STREAM_CLOSED\n",
+            this, stream, stream ? stream->StreamID() : 0,
+            mNeedsCleanup));
       CleanupStream(stream, NS_OK, RST_CANCEL);
       NS_ABORT_IF_FALSE(!mNeedsCleanup, "double cleanup out of data frame");
       return NS_OK;
     }
     
     if (mNeedsCleanup) {
+      LOG3(("SpdySession::WriteSegments session=%p stream=%p 0x%X "
+            "cleanup stream based on mNeedsCleanup.\n",
+            this, mNeedsCleanup, mNeedsCleanup ? mNeedsCleanup->StreamID() : 0));
       CleanupStream(mNeedsCleanup, NS_OK, RST_CANCEL);
       mNeedsCleanup = nsnull;
     }
 
     // In v3 this is where we would generate a window update
 
     return rv;
   }
@@ -1967,17 +2081,17 @@ SpdySession::TransactionHasDataToWrite(n
 {
   NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "wrong thread");
   LOG3(("SpdySession::TransactionHasDataToWrite %p trans=%p", this, caller));
 
   // a trapped signal from the http transaction to the connection that
   // it is no longer blocked on read.
 
   SpdyStream *stream = mStreamTransactionHash.Get(caller);
-  if (!stream) {
+  if (!stream || !VerifyStream(stream)) {
     LOG3(("SpdySession::TransactionHasDataToWrite %p caller %p not found",
           this, caller));
     return;
   }
   
   LOG3(("SpdySession::TransactionHasDataToWrite %p ID is %x",
         this, stream->StreamID()));
 
--- a/netwerk/protocol/http/SpdySession.h
+++ b/netwerk/protocol/http/SpdySession.h
@@ -154,16 +154,20 @@ public:
   const static PRUint32 kQueueMinimumCleanup = 8192;
   const static PRUint32 kQueueTailRoom    =  4096;
   const static PRUint32 kQueueReserved    =  1024;
 
   const static PRUint32 kSendingChunkSize = 4096;
   const static PRUint32 kDefaultMaxConcurrent = 100;
   const static PRUint32 kMaxStreamID = 0x7800000;
   
+  // This is a sentinel for a deleted stream. It is not a valid
+  // 31 bit stream ID.
+  const static PRUint32 kDeadStreamID = 0xffffdead;
+  
   static nsresult HandleSynStream(SpdySession *);
   static nsresult HandleSynReply(SpdySession *);
   static nsresult HandleRstStream(SpdySession *);
   static nsresult HandleSettings(SpdySession *);
   static nsresult HandleNoop(SpdySession *);
   static nsresult HandlePing(SpdySession *);
   static nsresult HandleGoAway(SpdySession *);
   static nsresult HandleHeaders(SpdySession *);
@@ -213,16 +217,18 @@ private:
   void        CleanupStream(SpdyStream *, nsresult, rstReason);
 
   void        SetWriteCallbacks();
   void        FlushOutputQueue();
 
   bool        RoomForMoreConcurrent();
   void        ActivateStream(SpdyStream *);
   void        ProcessPending();
+  nsresult    SetInputFrameDataStream(PRUint32);
+  bool        VerifyStream(SpdyStream *, PRUint32);
 
   // a wrapper for all calls to the nshttpconnection level segment writer. Used
   // to track network I/O for timeout purposes
   nsresult   NetworkRead(nsAHttpSegmentWriter *, char *, PRUint32, PRUint32 *);
   
   static PLDHashOperator ShutdownEnumerator(nsAHttpTransaction *,
                                             nsAutoPtr<SpdyStream> &,
                                             void *);
--- a/netwerk/protocol/http/SpdyStream.cpp
+++ b/netwerk/protocol/http/SpdyStream.cpp
@@ -88,16 +88,17 @@ SpdyStream::SpdyStream(nsAHttpTransactio
 
   LOG3(("SpdyStream::SpdyStream %p", this));
 
   mTxInlineFrame = new char[mTxInlineFrameSize];
 }
 
 SpdyStream::~SpdyStream()
 {
+  mStreamID = SpdySession::kDeadStreamID;
 }
 
 // ReadSegments() is used to write data down the socket. Generally, HTTP
 // request data is pulled from the approriate transaction and
 // converted to SPDY data. Sometimes control data like a window-update is
 // generated instead.
 
 nsresult