Bug 636063, part 3: Honor compression requests when queuing messages. r=bent
authorChris Jones <jones.chris.g@gmail.com>
Sat, 25 Aug 2012 01:25:08 -0700
changeset 105481 34081ff6b8ac8984abff8c0b0f374a8762720cb4
parent 105480 9a3d78f6623c46623d311e660455bb21146f6cdb
child 105482 779bdf71cde512dc03ca698b7d1c715ed80bbd28
push id55
push usershu@rfrn.org
push dateThu, 30 Aug 2012 01:33:09 +0000
reviewersbent
bugs636063
milestone17.0a1
Bug 636063, part 3: Honor compression requests when queuing messages. r=bent
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
ipc/ipdl/test/cxx/PTestLatency.ipdl
ipc/ipdl/test/cxx/TestLatency.cpp
ipc/ipdl/test/cxx/TestLatency.h
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -190,17 +190,17 @@ RPCChannel::Call(Message* _msg, Message*
         if (!mOutOfTurnReplies.empty() &&
             ((it = mOutOfTurnReplies.find(mStack.top().seqno())) !=
             mOutOfTurnReplies.end())) {
             recvd = it->second;
             mOutOfTurnReplies.erase(it);
         }
         else if (!mPending.empty()) {
             recvd = mPending.front();
-            mPending.pop();
+            mPending.pop_front();
         }
         else {
             // because of subtleties with nested event loops, it's
             // possible that we got here and nothing happened.  or, we
             // might have a deferred in-call that needs to be
             // processed.  either way, we won't break the inner while
             // loop again until something new happens.
             continue;
@@ -305,17 +305,17 @@ RPCChannel::MaybeUndeferIncall()
     // maybe time to process this message
     Message call = mDeferred.top();
     mDeferred.pop();
 
     // fix up fudge factor we added to account for race
     RPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
     --mRemoteStackDepthGuess;
 
-    mPending.push(call);
+    mPending.push_back(call);
 }
 
 void
 RPCChannel::EnqueuePendingMessages()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
 
@@ -377,17 +377,17 @@ RPCChannel::OnMaybeDequeueOne()
 
         if (!mDeferred.empty())
             MaybeUndeferIncall();
 
         if (mPending.empty())
             return false;
 
         recvd = mPending.front();
-        mPending.pop();
+        mPending.pop_front();
     }
 
     if (IsOnCxxStack() && recvd.is_rpc() && recvd.is_reply()) {
         // We probably just received a reply in a nested loop for an
         // RPC call sent before entering that loop.
         mOutOfTurnReplies[recvd.seqno()] = recvd;
         return false;
     }
@@ -572,17 +572,17 @@ RPCChannel::BlockOnParent()
         if (!Connected()) {
             mBlockedOnParent = false;
             ReportConnectionError("RPCChannel");
             break;
         }
 
         if (!mPending.empty()) {
             Message recvd = mPending.front();
-            mPending.pop();
+            mPending.pop_front();
 
             MonitorAutoUnlock unlock(*mMonitor);
 
             CxxStackFrame f(*this, IN_MESSAGE, &recvd);
             if (recvd.is_rpc()) {
                 // stack depth must be 0 here
                 Incall(recvd, 0);
             }
@@ -645,17 +645,17 @@ RPCChannel::DebugAbort(const char* file,
             mPending.size());
 
     MessageQueue pending = mPending;
     while (!pending.empty()) {
         fprintf(stderr, "    [ %s%s ]\n",
                 pending.front().is_rpc() ? "rpc" :
                 (pending.front().is_sync() ? "sync" : "async"),
                 pending.front().is_reply() ? "reply" : "");
-        pending.pop();
+        pending.pop_front();
     }
 
     NS_RUNTIMEABORT(why);
 }
 
 void
 RPCChannel::DumpRPCStack(FILE* outfile, const char* const pfx) const
 {
@@ -696,22 +696,36 @@ RPCChannel::OnMessageReceivedFromLink(co
     // know that it needs to be immediately handled to unblock us.
     if (AwaitingSyncReply() && msg.is_sync()) {
         // wake up worker thread waiting at SyncChannel::Send
         mRecvd = msg;
         NotifyWorkerThread();
         return;
     }
 
-    mPending.push(msg);
+    bool compressMessage = (msg.compress() && !mPending.empty() &&
+                            mPending.back().type() == msg.type() &&
+                            mPending.back().routing_id() == msg.routing_id());
+    if (compressMessage) {
+        // This message type has compression enabled, and the back of
+        // the queue was the same message type and routed to the same
+        // destination.  Replace it with the newer message.
+        MOZ_ASSERT(mPending.back().compress());
+        mPending.pop_back();
+    }
+
+    mPending.push_back(msg);
 
     if (0 == StackDepth() && !mBlockedOnParent) {
         // the worker thread might be idle, make sure it wakes up
-        mWorkerLoop->PostTask(FROM_HERE,
-                                     new DequeueTask(mDequeueOneTask));
+        if (!compressMessage) {
+            // If we compressed away the previous message, we'll reuse
+            // its pending task.
+            mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
+        }
     }
     else if (!AwaitingSyncReply())
         NotifyWorkerThread();
 }
 
 
 void
 RPCChannel::OnChannelErrorFromLink()
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -4,18 +4,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef ipc_glue_RPCChannel_h
 #define ipc_glue_RPCChannel_h 1
 
 #include <stdio.h>
 
-// FIXME/cjones probably shouldn't depend on STL
-#include <queue>
+#include <deque>
 #include <stack>
 #include <vector>
 
 #include "base/basictypes.h"
 
 #include "nsAtomicRefcnt.h"
 
 #include "mozilla/ipc/SyncChannel.h"
@@ -333,17 +332,17 @@ private:
     // |?{mStack.size() == 1}|, then other side "finished with us,"
     // and went back to its own business.  That business might have
     // included sending any number of async message |A<*| until
     // sending a blocking message |(S< | C<)|.  If we had more than
     // one RPC call on our stack, the other side *better* not have
     // sent us another blocking message, because it's blocked on a
     // reply from us.
     //
-    typedef std::queue<Message> MessageQueue;
+    typedef std::deque<Message> MessageQueue;
     MessageQueue mPending;
 
     // 
     // Stack of all the RPC out-calls on which this RPCChannel is
     // awaiting a response.
     //
     std::stack<Message> mStack;
 
--- a/ipc/ipdl/test/cxx/PTestLatency.ipdl
+++ b/ipc/ipdl/test/cxx/PTestLatency.ipdl
@@ -7,16 +7,19 @@ rpc protocol PTestLatency {
 
 child:
     __delete__();
     Ping();
     Ping5();
     rpc Rpc();
     Spam();
     rpc Synchro();
+    CompressedSpam(uint32_t seqno) compress;
+    rpc Synchro2() returns (uint32_t lastSeqno,
+                            uint32_t numMessagesDispatched);
 
 parent:
     Pong();
     Pong5();
 
 state START:
     // if the timing resolution is too low, abort the test
     send __delete__;
@@ -47,19 +50,26 @@ state PONG3: recv Pong5 goto PONG4;
 state PONG4: recv Pong5 goto PONG5;
 state PONG5: recv Pong5 goto PING5;
 
     // Trial 3: lotsa RPC
 state RPC:
     call Rpc goto RPC;
     send Spam goto SPAM;
 
+    // Trial 4: lots of sequential asyn messages, which tests pipelining
 state SPAM:
     send Spam goto SPAM;
-    call Synchro goto DONE;
+    call Synchro goto COMPRESSED_SPAM;
+
+    // Trial 5: lots of async spam, but compressed to cut down on
+    // dispatch overhead
+state COMPRESSED_SPAM:          // compressed spam, mmm
+    send CompressedSpam goto COMPRESSED_SPAM;
+    call Synchro2 goto DONE;
 
 state DONE:
     send __delete__;
 };
 
 
 } // namespace mozilla
 } // namespace _ipdltest
--- a/ipc/ipdl/test/cxx/TestLatency.cpp
+++ b/ipc/ipdl/test/cxx/TestLatency.cpp
@@ -14,17 +14,17 @@ namespace _ipdltest {
 
 TestLatencyParent::TestLatencyParent() :
     mStart(),
     mPPTimeTotal(),
     mPP5TimeTotal(),
     mRpcTimeTotal(),
     mPPTrialsToGo(NR_TRIALS),
     mPP5TrialsToGo(NR_TRIALS),
-    mSpamsToGo(NR_TRIALS)
+    mNumChildProcessedCompressedSpams(0)
 {
     MOZ_COUNT_CTOR(TestLatencyParent);
 }
 
 TestLatencyParent::~TestLatencyParent()
 {
     MOZ_COUNT_DTOR(TestLatencyParent);
 }
@@ -137,29 +137,57 @@ TestLatencyParent::SpamTrial()
     // been processed.  This adds the overhead of a reply message from
     // child-->here, but should be insignificant compared to >>
     // NR_SPAMS.
     if (!CallSynchro())
         fail("calling Synchro()");
 
     mSpamTimeTotal = (TimeStamp::Now() - start);
 
+    CompressedSpamTrial();
+}
+
+void
+TestLatencyParent::CompressedSpamTrial()
+{
+    for (int i = 0; i < NR_SPAMS; ++i) {
+        if (!SendCompressedSpam(i + 1))
+            fail("sending CompressedSpam()");
+        if (0 == (i % 10000))
+            printf("  CompressedSpam trial %d\n", i);
+    }
+
+    uint32_t lastSeqno;
+    if (!CallSynchro2(&lastSeqno, &mNumChildProcessedCompressedSpams))
+        fail("calling Synchro2()");
+
+    if (lastSeqno != NR_SPAMS)
+        fail("last seqno was %u, expected %u", lastSeqno, NR_SPAMS);
+
+    // NB: since this is testing an optimization, it's somewhat bogus.
+    // Need to make a warning if it actually intermittently fails in
+    // practice, which is doubtful.
+    if (!(mNumChildProcessedCompressedSpams < NR_SPAMS))
+        fail("Didn't compress any messages?");
+
     Exit();
 }
 
 void
 TestLatencyParent::Exit()
 {
     Close();
 }
 
 //-----------------------------------------------------------------------------
 // child
 
 TestLatencyChild::TestLatencyChild()
+    : mLastSeqno(0)
+    , mNumProcessedCompressedSpams(0)
 {
     MOZ_COUNT_CTOR(TestLatencyChild);
 }
 
 TestLatencyChild::~TestLatencyChild()
 {
     MOZ_COUNT_DTOR(TestLatencyChild);
 }
@@ -201,10 +229,30 @@ TestLatencyChild::RecvSpam()
 }
 
 bool
 TestLatencyChild::AnswerSynchro()
 {
     return true;
 }
 
+bool
+TestLatencyChild::RecvCompressedSpam(const uint32_t& seqno)
+{
+    if (seqno <= mLastSeqno)
+        fail("compressed seqnos must monotonically increase");
+
+    mLastSeqno = seqno;
+    ++mNumProcessedCompressedSpams;
+    return true;
+}
+
+bool
+TestLatencyChild::AnswerSynchro2(uint32_t* lastSeqno,
+                                 uint32_t* numMessagesDispatched)
+{
+    *lastSeqno = mLastSeqno;
+    *numMessagesDispatched = mNumProcessedCompressedSpams;
+    return true;
+}
+
 } // namespace _ipdltest
 } // namespace mozilla
--- a/ipc/ipdl/test/cxx/TestLatency.h
+++ b/ipc/ipdl/test/cxx/TestLatency.h
@@ -4,17 +4,17 @@
 #include "mozilla/_ipdltest/IPDLUnitTests.h"
 
 #include "mozilla/_ipdltest/PTestLatencyParent.h"
 #include "mozilla/_ipdltest/PTestLatencyChild.h"
 
 #include "mozilla/TimeStamp.h"
 
 #define NR_TRIALS 10000
-#define NR_SPAMS  50000
+#define NR_SPAMS  25000
 
 namespace mozilla {
 namespace _ipdltest {
 
 class TestLatencyParent :
     public PTestLatencyParent
 {
 private:
@@ -38,64 +38,74 @@ protected:
     {
         if (NormalShutdown != why)
             fail("unexpected destruction!");  
 
         passed("\n"
                "  average #ping-pong/sec:        %g\n"
                "  average #ping5-pong5/sec:      %g\n"
                "  average #RPC call-answer/sec:  %g\n"
-               "  average #spams/sec:            %g\n",
+               "  average #spams/sec:            %g\n"
+               "  pct. spams compressed away:    %g\n",
                double(NR_TRIALS) / mPPTimeTotal.ToSecondsSigDigits(),
                double(NR_TRIALS) / mPP5TimeTotal.ToSecondsSigDigits(),
                double(NR_TRIALS) / mRpcTimeTotal.ToSecondsSigDigits(),
-               double(NR_SPAMS) / mSpamTimeTotal.ToSecondsSigDigits());
+               double(NR_SPAMS) / mSpamTimeTotal.ToSecondsSigDigits(),
+               100.0 * (double(NR_SPAMS - mNumChildProcessedCompressedSpams) /
+                        double(NR_SPAMS)));
 
         QuitParent();
     }
 
 private:
     void PingPongTrial();
     void Ping5Pong5Trial();
     void RpcTrials();
     void SpamTrial();
+    void CompressedSpamTrial();
     void Exit();
 
     TimeStamp mStart;
     TimeDuration mPPTimeTotal;
     TimeDuration mPP5TimeTotal;
     TimeDuration mRpcTimeTotal;
     TimeDuration mSpamTimeTotal;
 
     int mPPTrialsToGo;
     int mPP5TrialsToGo;
-    int mSpamsToGo;
+    uint32_t mNumChildProcessedCompressedSpams;
 };
 
 
 class TestLatencyChild :
     public PTestLatencyChild
 {
 public:
     TestLatencyChild();
     virtual ~TestLatencyChild();
 
 protected:
     virtual bool RecvPing() MOZ_OVERRIDE;
     virtual bool RecvPing5() MOZ_OVERRIDE;
     virtual bool AnswerRpc() MOZ_OVERRIDE;
     virtual bool RecvSpam() MOZ_OVERRIDE;
     virtual bool AnswerSynchro() MOZ_OVERRIDE;
+    virtual bool RecvCompressedSpam(const uint32_t& seqno) MOZ_OVERRIDE;
+    virtual bool AnswerSynchro2(uint32_t* lastSeqno,
+                                uint32_t* numMessagesDispatched) MOZ_OVERRIDE;
 
     virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE
     {
         if (NormalShutdown != why)
             fail("unexpected destruction!");
         QuitChild();
     }
+
+    uint32_t mLastSeqno;
+    uint32_t mNumProcessedCompressedSpams;
 };
 
 
 } // namespace _ipdltest
 } // namespace mozilla
 
 
 #endif // ifndef mozilla__ipdltest_TestLatency_h