Bug 636063, part 3: Honor compression requests when queuing messages. r=bent
authorChris Jones <jones.chris.g@gmail.com>
Sat, 25 Aug 2012 01:25:08 -0700
changeset 103448 34081ff6b8ac8984abff8c0b0f374a8762720cb4
parent 103447 9a3d78f6623c46623d311e660455bb21146f6cdb
child 103449 779bdf71cde512dc03ca698b7d1c715ed80bbd28
push id23348
push userryanvm@gmail.com
push dateSun, 26 Aug 2012 02:09:16 +0000
treeherdermozilla-central@b3cce81fef1a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersbent
bugs636063
milestone17.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 636063, part 3: Honor compression requests when queuing messages. r=bent
ipc/glue/RPCChannel.cpp
ipc/glue/RPCChannel.h
ipc/ipdl/test/cxx/PTestLatency.ipdl
ipc/ipdl/test/cxx/TestLatency.cpp
ipc/ipdl/test/cxx/TestLatency.h
--- a/ipc/glue/RPCChannel.cpp
+++ b/ipc/glue/RPCChannel.cpp
@@ -190,17 +190,17 @@ RPCChannel::Call(Message* _msg, Message*
         if (!mOutOfTurnReplies.empty() &&
             ((it = mOutOfTurnReplies.find(mStack.top().seqno())) !=
             mOutOfTurnReplies.end())) {
             recvd = it->second;
             mOutOfTurnReplies.erase(it);
         }
         else if (!mPending.empty()) {
             recvd = mPending.front();
-            mPending.pop();
+            mPending.pop_front();
         }
         else {
             // because of subtleties with nested event loops, it's
             // possible that we got here and nothing happened.  or, we
             // might have a deferred in-call that needs to be
             // processed.  either way, we won't break the inner while
             // loop again until something new happens.
             continue;
@@ -305,17 +305,17 @@ RPCChannel::MaybeUndeferIncall()
     // maybe time to process this message
     Message call = mDeferred.top();
     mDeferred.pop();
 
     // fix up fudge factor we added to account for race
     RPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
     --mRemoteStackDepthGuess;
 
-    mPending.push(call);
+    mPending.push_back(call);
 }
 
 void
 RPCChannel::EnqueuePendingMessages()
 {
     AssertWorkerThread();
     mMonitor->AssertCurrentThreadOwns();
 
@@ -377,17 +377,17 @@ RPCChannel::OnMaybeDequeueOne()
 
         if (!mDeferred.empty())
             MaybeUndeferIncall();
 
         if (mPending.empty())
             return false;
 
         recvd = mPending.front();
-        mPending.pop();
+        mPending.pop_front();
     }
 
     if (IsOnCxxStack() && recvd.is_rpc() && recvd.is_reply()) {
         // We probably just received a reply in a nested loop for an
         // RPC call sent before entering that loop.
         mOutOfTurnReplies[recvd.seqno()] = recvd;
         return false;
     }
@@ -572,17 +572,17 @@ RPCChannel::BlockOnParent()
         if (!Connected()) {
             mBlockedOnParent = false;
             ReportConnectionError("RPCChannel");
             break;
         }
 
         if (!mPending.empty()) {
             Message recvd = mPending.front();
-            mPending.pop();
+            mPending.pop_front();
 
             MonitorAutoUnlock unlock(*mMonitor);
 
             CxxStackFrame f(*this, IN_MESSAGE, &recvd);
             if (recvd.is_rpc()) {
                 // stack depth must be 0 here
                 Incall(recvd, 0);
             }
@@ -645,17 +645,17 @@ RPCChannel::DebugAbort(const char* file,
             mPending.size());
 
     MessageQueue pending = mPending;
     while (!pending.empty()) {
         fprintf(stderr, "    [ %s%s ]\n",
                 pending.front().is_rpc() ? "rpc" :
                 (pending.front().is_sync() ? "sync" : "async"),
                 pending.front().is_reply() ? "reply" : "");
-        pending.pop();
+        pending.pop_front();
     }
 
     NS_RUNTIMEABORT(why);
 }
 
 void
 RPCChannel::DumpRPCStack(FILE* outfile, const char* const pfx) const
 {
@@ -696,22 +696,36 @@ RPCChannel::OnMessageReceivedFromLink(co
     // know that it needs to be immediately handled to unblock us.
     if (AwaitingSyncReply() && msg.is_sync()) {
         // wake up worker thread waiting at SyncChannel::Send
         mRecvd = msg;
         NotifyWorkerThread();
         return;
     }
 
-    mPending.push(msg);
+    bool compressMessage = (msg.compress() && !mPending.empty() &&
+                            mPending.back().type() == msg.type() &&
+                            mPending.back().routing_id() == msg.routing_id());
+    if (compressMessage) {
+        // This message type has compression enabled, and the back of
+        // the queue was the same message type and routed to the same
+        // destination.  Replace it with the newer message.
+        MOZ_ASSERT(mPending.back().compress());
+        mPending.pop_back();
+    }
+
+    mPending.push_back(msg);
 
     if (0 == StackDepth() && !mBlockedOnParent) {
         // the worker thread might be idle, make sure it wakes up
-        mWorkerLoop->PostTask(FROM_HERE,
-                                     new DequeueTask(mDequeueOneTask));
+        if (!compressMessage) {
+            // If we compressed away the previous message, we'll reuse
+            // its pending task.
+            mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
+        }
     }
     else if (!AwaitingSyncReply())
         NotifyWorkerThread();
 }
 
 
 void
 RPCChannel::OnChannelErrorFromLink()
--- a/ipc/glue/RPCChannel.h
+++ b/ipc/glue/RPCChannel.h
@@ -4,18 +4,17 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef ipc_glue_RPCChannel_h
 #define ipc_glue_RPCChannel_h 1
 
 #include <stdio.h>
 
-// FIXME/cjones probably shouldn't depend on STL
-#include <queue>
+#include <deque>
 #include <stack>
 #include <vector>
 
 #include "base/basictypes.h"
 
 #include "nsAtomicRefcnt.h"
 
 #include "mozilla/ipc/SyncChannel.h"
@@ -333,17 +332,17 @@ private:
     // |?{mStack.size() == 1}|, then other side "finished with us,"
     // and went back to its own business.  That business might have
     // included sending any number of async message |A<*| until
     // sending a blocking message |(S< | C<)|.  If we had more than
     // one RPC call on our stack, the other side *better* not have
     // sent us another blocking message, because it's blocked on a
     // reply from us.
     //
-    typedef std::queue<Message> MessageQueue;
+    typedef std::deque<Message> MessageQueue;
     MessageQueue mPending;
 
     // 
     // Stack of all the RPC out-calls on which this RPCChannel is
     // awaiting a response.
     //
     std::stack<Message> mStack;
 
--- a/ipc/ipdl/test/cxx/PTestLatency.ipdl
+++ b/ipc/ipdl/test/cxx/PTestLatency.ipdl
@@ -7,16 +7,19 @@ rpc protocol PTestLatency {
 
 child:
     __delete__();
     Ping();
     Ping5();
     rpc Rpc();
     Spam();
     rpc Synchro();
+    CompressedSpam(uint32_t seqno) compress;
+    rpc Synchro2() returns (uint32_t lastSeqno,
+                            uint32_t numMessagesDispatched);
 
 parent:
     Pong();
     Pong5();
 
 state START:
     // if the timing resolution is too low, abort the test
     send __delete__;
@@ -47,19 +50,26 @@ state PONG3: recv Pong5 goto PONG4;
 state PONG4: recv Pong5 goto PONG5;
 state PONG5: recv Pong5 goto PING5;
 
     // Trial 3: lotsa RPC
 state RPC:
     call Rpc goto RPC;
     send Spam goto SPAM;
 
+    // Trial 4: lots of sequential asyn messages, which tests pipelining
 state SPAM:
     send Spam goto SPAM;
-    call Synchro goto DONE;
+    call Synchro goto COMPRESSED_SPAM;
+
+    // Trial 5: lots of async spam, but compressed to cut down on
+    // dispatch overhead
+state COMPRESSED_SPAM:          // compressed spam, mmm
+    send CompressedSpam goto COMPRESSED_SPAM;
+    call Synchro2 goto DONE;
 
 state DONE:
     send __delete__;
 };
 
 
 } // namespace mozilla
 } // namespace _ipdltest
--- a/ipc/ipdl/test/cxx/TestLatency.cpp
+++ b/ipc/ipdl/test/cxx/TestLatency.cpp
@@ -14,17 +14,17 @@ namespace _ipdltest {
 
 TestLatencyParent::TestLatencyParent() :
     mStart(),
     mPPTimeTotal(),
     mPP5TimeTotal(),
     mRpcTimeTotal(),
     mPPTrialsToGo(NR_TRIALS),
     mPP5TrialsToGo(NR_TRIALS),
-    mSpamsToGo(NR_TRIALS)
+    mNumChildProcessedCompressedSpams(0)
 {
     MOZ_COUNT_CTOR(TestLatencyParent);
 }
 
 TestLatencyParent::~TestLatencyParent()
 {
     MOZ_COUNT_DTOR(TestLatencyParent);
 }
@@ -137,29 +137,57 @@ TestLatencyParent::SpamTrial()
     // been processed.  This adds the overhead of a reply message from
     // child-->here, but should be insignificant compared to >>
     // NR_SPAMS.
     if (!CallSynchro())
         fail("calling Synchro()");
 
     mSpamTimeTotal = (TimeStamp::Now() - start);
 
+    CompressedSpamTrial();
+}
+
+void
+TestLatencyParent::CompressedSpamTrial()
+{
+    for (int i = 0; i < NR_SPAMS; ++i) {
+        if (!SendCompressedSpam(i + 1))
+            fail("sending CompressedSpam()");
+        if (0 == (i % 10000))
+            printf("  CompressedSpam trial %d\n", i);
+    }
+
+    uint32_t lastSeqno;
+    if (!CallSynchro2(&lastSeqno, &mNumChildProcessedCompressedSpams))
+        fail("calling Synchro2()");
+
+    if (lastSeqno != NR_SPAMS)
+        fail("last seqno was %u, expected %u", lastSeqno, NR_SPAMS);
+
+    // NB: since this is testing an optimization, it's somewhat bogus.
+    // Need to make a warning if it actually intermittently fails in
+    // practice, which is doubtful.
+    if (!(mNumChildProcessedCompressedSpams < NR_SPAMS))
+        fail("Didn't compress any messages?");
+
     Exit();
 }
 
 void
 TestLatencyParent::Exit()
 {
     Close();
 }
 
 //-----------------------------------------------------------------------------
 // child
 
 TestLatencyChild::TestLatencyChild()
+    : mLastSeqno(0)
+    , mNumProcessedCompressedSpams(0)
 {
     MOZ_COUNT_CTOR(TestLatencyChild);
 }
 
 TestLatencyChild::~TestLatencyChild()
 {
     MOZ_COUNT_DTOR(TestLatencyChild);
 }
@@ -201,10 +229,30 @@ TestLatencyChild::RecvSpam()
 }
 
 bool
 TestLatencyChild::AnswerSynchro()
 {
     return true;
 }
 
+bool
+TestLatencyChild::RecvCompressedSpam(const uint32_t& seqno)
+{
+    if (seqno <= mLastSeqno)
+        fail("compressed seqnos must monotonically increase");
+
+    mLastSeqno = seqno;
+    ++mNumProcessedCompressedSpams;
+    return true;
+}
+
+bool
+TestLatencyChild::AnswerSynchro2(uint32_t* lastSeqno,
+                                 uint32_t* numMessagesDispatched)
+{
+    *lastSeqno = mLastSeqno;
+    *numMessagesDispatched = mNumProcessedCompressedSpams;
+    return true;
+}
+
 } // namespace _ipdltest
 } // namespace mozilla
--- a/ipc/ipdl/test/cxx/TestLatency.h
+++ b/ipc/ipdl/test/cxx/TestLatency.h
@@ -4,17 +4,17 @@
 #include "mozilla/_ipdltest/IPDLUnitTests.h"
 
 #include "mozilla/_ipdltest/PTestLatencyParent.h"
 #include "mozilla/_ipdltest/PTestLatencyChild.h"
 
 #include "mozilla/TimeStamp.h"
 
 #define NR_TRIALS 10000
-#define NR_SPAMS  50000
+#define NR_SPAMS  25000
 
 namespace mozilla {
 namespace _ipdltest {
 
 class TestLatencyParent :
     public PTestLatencyParent
 {
 private:
@@ -38,64 +38,74 @@ protected:
     {
         if (NormalShutdown != why)
             fail("unexpected destruction!");  
 
         passed("\n"
                "  average #ping-pong/sec:        %g\n"
                "  average #ping5-pong5/sec:      %g\n"
                "  average #RPC call-answer/sec:  %g\n"
-               "  average #spams/sec:            %g\n",
+               "  average #spams/sec:            %g\n"
+               "  pct. spams compressed away:    %g\n",
                double(NR_TRIALS) / mPPTimeTotal.ToSecondsSigDigits(),
                double(NR_TRIALS) / mPP5TimeTotal.ToSecondsSigDigits(),
                double(NR_TRIALS) / mRpcTimeTotal.ToSecondsSigDigits(),
-               double(NR_SPAMS) / mSpamTimeTotal.ToSecondsSigDigits());
+               double(NR_SPAMS) / mSpamTimeTotal.ToSecondsSigDigits(),
+               100.0 * (double(NR_SPAMS - mNumChildProcessedCompressedSpams) /
+                        double(NR_SPAMS)));
 
         QuitParent();
     }
 
 private:
     void PingPongTrial();
     void Ping5Pong5Trial();
     void RpcTrials();
     void SpamTrial();
+    void CompressedSpamTrial();
     void Exit();
 
     TimeStamp mStart;
     TimeDuration mPPTimeTotal;
     TimeDuration mPP5TimeTotal;
     TimeDuration mRpcTimeTotal;
     TimeDuration mSpamTimeTotal;
 
     int mPPTrialsToGo;
     int mPP5TrialsToGo;
-    int mSpamsToGo;
+    uint32_t mNumChildProcessedCompressedSpams;
 };
 
 
 class TestLatencyChild :
     public PTestLatencyChild
 {
 public:
     TestLatencyChild();
     virtual ~TestLatencyChild();
 
 protected:
     virtual bool RecvPing() MOZ_OVERRIDE;
     virtual bool RecvPing5() MOZ_OVERRIDE;
     virtual bool AnswerRpc() MOZ_OVERRIDE;
     virtual bool RecvSpam() MOZ_OVERRIDE;
     virtual bool AnswerSynchro() MOZ_OVERRIDE;
+    virtual bool RecvCompressedSpam(const uint32_t& seqno) MOZ_OVERRIDE;
+    virtual bool AnswerSynchro2(uint32_t* lastSeqno,
+                                uint32_t* numMessagesDispatched) MOZ_OVERRIDE;
 
     virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE
     {
         if (NormalShutdown != why)
             fail("unexpected destruction!");
         QuitChild();
     }
+
+    uint32_t mLastSeqno;
+    uint32_t mNumProcessedCompressedSpams;
 };
 
 
 } // namespace _ipdltest
 } // namespace mozilla
 
 
 #endif // ifndef mozilla__ipdltest_TestLatency_h