Bug 786234 - Part 3: Plumbing for filtration and bundle usage detection. r=abr
☠☠ backed out by 2b0258a49e8d ☠ ☠
authorByron Campen [:bwc] <docfaraday@gmail.com>
Tue, 14 Jan 2014 16:29:42 -0800
changeset 170224 c5334aea64331b9dcc282082cd16a5511917e53d
parent 170223 53d8b186e5742151279233834e73eb0eef02a6c9
child 170225 c8083d830fa6b2fa4fa4e549d4942b64b289a7bd
push id26283
push userkwierso@gmail.com
push dateTue, 25 Feb 2014 01:45:31 +0000
treeherdermozilla-central@e3daaa4c73dd [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersabr
bugs786234
milestone30.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 786234 - Part 3: Plumbing for filtration and bundle usage detection. r=abr
media/mtransport/transportflow.cpp
media/mtransport/transportflow.h
media/webrtc/signaling/src/media/VcmSIPCCBinding.cpp
media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp
media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.h
media/webrtc/signaling/test/mediapipeline_unittest.cpp
--- a/media/mtransport/transportflow.cpp
+++ b/media/mtransport/transportflow.cpp
@@ -194,16 +194,27 @@ TransportResult TransportFlow::SendPacke
   CheckThread();
 
   if (state_ != TransportLayer::TS_OPEN) {
     return TE_ERROR;
   }
   return top() ? top()->SendPacket(data, len) : TE_ERROR;
 }
 
+bool TransportFlow::Contains(TransportLayer *layer) const {
+  if (layers_) {
+    for (auto l = layers_->begin(); l != layers_->end(); ++l) {
+      if (*l == layer) {
+        return true;
+      }
+    }
+  }
+  return false;
+}
+
 void TransportFlow::EnsureSameThread(TransportLayer *layer)  {
   // Enforce that if any of the layers have a thread binding,
   // they all have the same binding.
   if (target_) {
     const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();
 
     if (lthread && (lthread != target_))
       MOZ_CRASH();
--- a/media/mtransport/transportflow.h
+++ b/media/mtransport/transportflow.h
@@ -91,16 +91,18 @@ class TransportFlow : public sigslot::ha
   // State has changed. Reflects the top flow.
   sigslot::signal2<TransportFlow *, TransportLayer::State>
     SignalStateChange;
 
   // Data received on the flow
   sigslot::signal3<TransportFlow*, const unsigned char *, size_t>
     SignalPacketReceived;
 
+  bool Contains(TransportLayer *layer) const;
+
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TransportFlow)
 
  private:
   DISALLOW_COPY_ASSIGN(TransportFlow);
 
   // Check if we are on the right thread
   void CheckThread() const {
     if (!CheckThreadInt())
--- a/media/webrtc/signaling/src/media/VcmSIPCCBinding.cpp
+++ b/media/webrtc/signaling/src/media/VcmSIPCCBinding.cpp
@@ -7,16 +7,17 @@
 #include "CC_Common.h"
 
 #include "CSFMediaProvider.h"
 #include "CSFAudioTermination.h"
 #include "CSFVideoTermination.h"
 #include "MediaConduitErrors.h"
 #include "MediaConduitInterface.h"
 #include "MediaPipeline.h"
+#include "MediaPipelineFilter.h"
 #include "VcmSIPCCBinding.h"
 #include "csf_common.h"
 #include "PeerConnectionImpl.h"
 #include "PeerConnectionMedia.h"
 #include "nsThreadUtils.h"
 #include "transportflow.h"
 #include "transportlayer.h"
 #include "transportlayerdtls.h"
@@ -1496,17 +1497,22 @@ static int vcmRxStartICE_m(cc_mcapid_t m
         const char *peerconnection,
         int num_payloads,
         const vcm_payload_info_t* payloads,
         sdp_setup_type_e setup_type,
         const char *fingerprint_alg,
         const char *fingerprint,
         vcm_mediaAttrs_t *attrs)
 {
-  CSFLogDebug( logTag, "%s(%s)", __FUNCTION__, peerconnection);
+  CSFLogDebug( logTag, "%s(%s) track = %d, stream = %d, level = %d",
+              __FUNCTION__,
+              peerconnection,
+              pc_track_id,
+              pc_stream_id,
+              level);
 
   // Find the PC.
   sipcc::PeerConnectionWrapper pc(peerconnection);
   ENSURE_PC(pc, VCM_ERROR);
 
   // Datachannel will use this though not for RTP
   mozilla::RefPtr<TransportFlow> rtp_flow =
     vcmCreateTransportFlow(pc.impl(), level, false, setup_type,
@@ -1532,25 +1538,79 @@ static int vcmRxStartICE_m(cc_mcapid_t m
     pc.impl()->media()->GetRemoteStream(pc_stream_id);
   if (!stream) {
     // This should never happen
     PR_ASSERT(PR_FALSE);
     return VCM_ERROR;
   }
 
   mozilla::RefPtr<TransportFlow> rtcp_flow = nullptr;
-  if(!attrs->rtcp_mux) {
+  if (!attrs->rtcp_mux) {
     rtcp_flow = vcmCreateTransportFlow(pc.impl(), level, true, setup_type,
                                        fingerprint_alg, fingerprint);
     if (!rtcp_flow) {
       CSFLogError( logTag, "Could not create RTCP flow");
       return VCM_ERROR;
     }
   }
 
+  // If we're offering bundle, a given MediaPipeline could receive traffic on
+  // two different network flows depending on whether the answerer accepts,
+  // before any answer comes in. We need to be prepared for both cases.
+  nsAutoPtr<mozilla::MediaPipelineFilter> filter;
+  RefPtr<TransportFlow> bundle_rtp_flow;
+  RefPtr<TransportFlow> bundle_rtcp_flow;
+  if (attrs->bundle_level) {
+    filter = new MediaPipelineFilter;
+    // Record our correlator, if present in our offer.
+    filter->SetCorrelator(attrs->bundle_stream_correlator);
+
+    // Record our own ssrcs (these are _not_ those of the remote end; that
+    // is handled in vcmTxStart)
+    for (int s = 0; s < attrs->num_ssrcs; ++s) {
+      filter->AddLocalSSRC(attrs->ssrcs[s]);
+    }
+
+    // Record the unique payload types
+    for (int p = 0; p < attrs->num_unique_payload_types; ++p) {
+      filter->AddUniquePT(attrs->unique_payload_types[p]);
+    }
+
+    // Do not pass additional TransportFlows if the pipeline will use the same
+    // flow regardless of whether bundle happens or not.
+    if (attrs->bundle_level != (unsigned int)level) {
+      // This might end up creating it, or might reuse it.
+      mozilla::RefPtr<TransportFlow> bundle_rtp_flow =
+        vcmCreateTransportFlow(pc.impl(),
+                               attrs->bundle_level,
+                               false,
+                               setup_type,
+                               fingerprint_alg,
+                               fingerprint);
+
+      if (!bundle_rtp_flow) {
+        CSFLogError( logTag, "Could not create bundle RTP flow");
+        return VCM_ERROR;
+      }
+
+      if (!attrs->rtcp_mux) {
+        bundle_rtcp_flow = vcmCreateTransportFlow(pc.impl(),
+                                                  attrs->bundle_level,
+                                                  true,
+                                                  setup_type,
+                                                  fingerprint_alg,
+                                                  fingerprint);
+        if (!bundle_rtcp_flow) {
+          CSFLogError( logTag, "Could not create bundle RTCP flow");
+          return VCM_ERROR;
+        }
+      }
+    }
+  }
+
   if (CC_IS_AUDIO(mcap_id)) {
     std::vector<mozilla::AudioCodecConfig *> configs;
 
     // Instantiate an appropriate conduit
     mozilla::RefPtr<mozilla::MediaSessionConduit> tx_conduit =
       pc.impl()->media()->GetConduit(level, false);
     MOZ_ASSERT_IF(tx_conduit, tx_conduit->type() == MediaSessionConduit::AUDIO);
 
@@ -1575,27 +1635,31 @@ static int vcmRxStartICE_m(cc_mcapid_t m
         payloads[i].audio.channels,
         payloads[i].audio.bitrate);
       configs.push_back(config_raw);
     }
 
     if (conduit->ConfigureRecvMediaCodecs(configs))
       return VCM_ERROR;
 
-
     // Now we have all the pieces, create the pipeline
     mozilla::RefPtr<mozilla::MediaPipeline> pipeline =
       new mozilla::MediaPipelineReceiveAudio(
         pc.impl()->GetHandle(),
         pc.impl()->GetMainThread().get(),
         pc.impl()->GetSTSThread(),
         stream->GetMediaStream()->GetStream(),
         pc_track_id,
         level,
-        conduit, rtp_flow, rtcp_flow);
+        conduit,
+        rtp_flow,
+        rtcp_flow,
+        bundle_rtp_flow,
+        bundle_rtcp_flow,
+        filter);
 
     nsresult res = pipeline->Init();
     if (NS_FAILED(res)) {
       CSFLogError(logTag, "Failure initializing audio pipeline");
       return VCM_ERROR;
     }
 
     CSFLogDebug(logTag, "Created audio pipeline %p, conduit=%p, pc_stream=%d pc_track=%d",
@@ -1637,17 +1701,22 @@ static int vcmRxStartICE_m(cc_mcapid_t m
     mozilla::RefPtr<mozilla::MediaPipeline> pipeline =
         new mozilla::MediaPipelineReceiveVideo(
             pc.impl()->GetHandle(),
             pc.impl()->GetMainThread().get(),
             pc.impl()->GetSTSThread(),
             stream->GetMediaStream()->GetStream(),
             pc_track_id,
             level,
-            conduit, rtp_flow, rtcp_flow);
+            conduit,
+            rtp_flow,
+            rtcp_flow,
+            bundle_rtp_flow,
+            bundle_rtcp_flow,
+            filter);
 
     nsresult res = pipeline->Init();
     if (NS_FAILED(res)) {
       CSFLogError(logTag, "Failure initializing video pipeline");
       return VCM_ERROR;
     }
 
     CSFLogDebug(logTag, "Created video pipeline %p, conduit=%p, pc_stream=%d pc_track=%d",
@@ -2177,17 +2246,22 @@ static int vcmTxStartICE_m(cc_mcapid_t m
         const char *peerconnection,
         const vcm_payload_info_t *payload,
         short tos,
         sdp_setup_type_e setup_type,
         const char *fingerprint_alg,
         const char *fingerprint,
         vcm_mediaAttrs_t *attrs)
 {
-  CSFLogDebug( logTag, "%s(%s)", __FUNCTION__, peerconnection);
+  CSFLogDebug( logTag, "%s(%s) track = %d, stream = %d, level = %d",
+              __FUNCTION__,
+              peerconnection,
+              pc_track_id,
+              pc_stream_id,
+              level);
 
   // Find the PC and get the stream
   sipcc::PeerConnectionWrapper pc(peerconnection);
   ENSURE_PC(pc, VCM_ERROR);
   nsRefPtr<sipcc::LocalSourceStreamInfo> stream = pc.impl()->media()->
     GetLocalStream(pc_stream_id);
 
   // Create the transport flows
@@ -2203,16 +2277,32 @@ static int vcmTxStartICE_m(cc_mcapid_t m
     rtcp_flow = vcmCreateTransportFlow(pc.impl(), level, true, setup_type,
                                        fingerprint_alg, fingerprint);
     if (!rtcp_flow) {
       CSFLogError( logTag, "Could not create RTCP flow");
       return VCM_ERROR;
     }
   }
 
+  // This tells the receive MediaPipeline (if there is one) whether we are
+  // doing bundle, and if so, updates the filter. This does not affect the
+  // transmit MediaPipeline (created above) at all.
+  if (attrs->bundle_level) {
+    nsAutoPtr<mozilla::MediaPipelineFilter> filter (new MediaPipelineFilter);
+    for (int s = 0; s < attrs->num_ssrcs; ++s) {
+      filter->AddRemoteSSRC(attrs->ssrcs[s]);
+    }
+    pc.impl()->media()->SetUsingBundle_m(level, true);
+    pc.impl()->media()->UpdateFilterFromRemoteDescription_m(level, filter);
+  } else {
+    // This will also clear the filter.
+    pc.impl()->media()->SetUsingBundle_m(level, false);
+  }
+
+
   if (CC_IS_AUDIO(mcap_id)) {
     mozilla::AudioCodecConfig *config_raw;
     config_raw = new mozilla::AudioCodecConfig(
       payload->remote_rtp_pt,
       ccsdpCodecName(payload->codec_type),
       payload->audio.frequency,
       payload->audio.packet_size,
       payload->audio.channels,
--- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
+++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp
@@ -37,16 +37,19 @@
 #include "transportlayerdtls.h"
 #include "transportlayerice.h"
 #include "runnable_utils.h"
 #include "gfxImageSurface.h"
 #include "libyuv/convert.h"
 #include "mozilla/gfx/Point.h"
 #include "mozilla/gfx/Types.h"
 
+#include "webrtc/modules/interface/module_common_types.h"
+#include "webrtc/modules/rtp_rtcp/interface/rtp_header_parser.h"
+
 using namespace mozilla;
 using namespace mozilla::gfx;
 
 // Logging context
 MOZ_MTLOG_MODULE("mediapipeline")
 
 namespace mozilla {
 
@@ -70,126 +73,138 @@ nsresult MediaPipeline::Init() {
   return NS_OK;
 }
 
 nsresult MediaPipeline::Init_s() {
   ASSERT_ON_THREAD(sts_thread_);
   conduit_->AttachTransport(transport_);
 
   nsresult res;
-  MOZ_ASSERT(rtp_transport_);
-  // Look to see if the transport is ready
-  rtp_transport_->SignalStateChange.connect(this,
-                                            &MediaPipeline::StateChange);
+  MOZ_ASSERT(rtp_.transport_);
+  MOZ_ASSERT(rtcp_.transport_);
+  res = ConnectTransport_s(rtp_);
+  if (NS_FAILED(res)) {
+    return res;
+  }
 
-  if (rtp_transport_->state() == TransportLayer::TS_OPEN) {
-    res = TransportReady_s(rtp_transport_);
+  if (rtcp_.transport_ != rtp_.transport_) {
+    res = ConnectTransport_s(rtcp_);
     if (NS_FAILED(res)) {
-      MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res="
-                << static_cast<uint32_t>(res) << " in " << __FUNCTION__);
       return res;
     }
-  } else if (rtp_transport_->state() == TransportLayer::TS_ERROR) {
-    MOZ_MTLOG(ML_ERROR, "RTP transport is already in error state");
-    TransportFailed_s(rtp_transport_);
-    return NS_ERROR_FAILURE;
   }
 
-  // If rtcp_transport_ is the same as rtp_transport_ then we are muxing.
-  // Otherwise, set it up separately.
-  if (rtcp_transport_ != rtp_transport_) {
-    rtcp_transport_->SignalStateChange.connect(this,
-                                               &MediaPipeline::StateChange);
+  if (possible_bundle_rtp_) {
+    MOZ_ASSERT(possible_bundle_rtcp_);
+    MOZ_ASSERT(possible_bundle_rtp_->transport_);
+    MOZ_ASSERT(possible_bundle_rtcp_->transport_);
 
-    if (rtcp_transport_->state() == TransportLayer::TS_OPEN) {
-      res = TransportReady_s(rtcp_transport_);
+    res = ConnectTransport_s(*possible_bundle_rtp_);
+    if (NS_FAILED(res)) {
+      return res;
+    }
+
+    if (possible_bundle_rtcp_->transport_ != possible_bundle_rtp_->transport_) {
+      res = ConnectTransport_s(*possible_bundle_rtcp_);
       if (NS_FAILED(res)) {
-        MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res="
-                  << static_cast<uint32_t>(res) << " in " << __FUNCTION__);
         return res;
       }
-    } else if (rtcp_transport_->state() == TransportLayer::TS_ERROR) {
-      MOZ_MTLOG(ML_ERROR, "RTCP transport is already in error state");
-      TransportFailed_s(rtcp_transport_);
-      return NS_ERROR_FAILURE;
     }
   }
 
   return NS_OK;
 }
 
 
 // Disconnect us from the transport so that we can cleanly destruct the
 // pipeline on the main thread.  ShutdownMedia_m() must have already been
 // called
 void MediaPipeline::ShutdownTransport_s() {
   ASSERT_ON_THREAD(sts_thread_);
   MOZ_ASSERT(!stream_); // verifies that ShutdownMedia_m() has run
 
   disconnect_all();
   transport_->Detach();
-  rtp_transport_ = nullptr;
-  rtcp_transport_ = nullptr;
+  rtp_.transport_ = nullptr;
+  rtcp_.transport_ = nullptr;
+  possible_bundle_rtp_ = nullptr;
+  possible_bundle_rtcp_ = nullptr;
 }
 
 void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state) {
-  // If rtcp_transport_ is the same as rtp_transport_ then we are muxing.
-  // So the only flow should be the RTP flow.
-  if (rtcp_transport_ == rtp_transport_) {
-    MOZ_ASSERT(flow == rtp_transport_);
-  }
+  TransportInfo* info = GetTransportInfo_s(flow);
+  MOZ_ASSERT(info);
 
   if (state == TransportLayer::TS_OPEN) {
     MOZ_MTLOG(ML_INFO, "Flow is ready");
-    TransportReady_s(flow);
+    TransportReady_s(*info);
   } else if (state == TransportLayer::TS_CLOSED ||
              state == TransportLayer::TS_ERROR) {
-    TransportFailed_s(flow);
+    TransportFailed_s(*info);
   }
 }
 
-nsresult MediaPipeline::TransportReady_s(TransportFlow *flow) {
+static bool MakeRtpTypeToStringArray(const char** array) {
+  static const char* RTP_str = "RTP";
+  static const char* RTCP_str = "RTCP";
+  static const char* MUX_str = "RTP/RTCP mux";
+  array[MediaPipeline::RTP] = RTP_str;
+  array[MediaPipeline::RTCP] = RTCP_str;
+  array[MediaPipeline::MUX] = MUX_str;
+  return true;
+}
+
+static const char* ToString(MediaPipeline::RtpType type) {
+  static const char* array[(int)MediaPipeline::MAX_RTP_TYPE] = {nullptr};
+  // Dummy variable to cause init to happen only on first call
+  static bool dummy = MakeRtpTypeToStringArray(array);
+  (void)dummy;
+  return array[type];
+}
+
+nsresult MediaPipeline::TransportReady_s(TransportInfo &info) {
   MOZ_ASSERT(!description_.empty());
-  bool rtcp = !(flow == rtp_transport_);
-  State *state = rtcp ? &rtcp_state_ : &rtp_state_;
 
   // TODO(ekr@rtfm.com): implement some kind of notification on
   // failure. bug 852665.
-  if (*state != MP_CONNECTING) {
+  if (info.state_ != MP_CONNECTING) {
     MOZ_MTLOG(ML_ERROR, "Transport ready for flow in wrong state:" <<
-              description_ << ": " << (rtcp ? "rtcp" : "rtp"));
+              description_ << ": " << ToString(info.type_));
     return NS_ERROR_FAILURE;
   }
 
-  nsresult res;
-
   MOZ_MTLOG(ML_INFO, "Transport ready for pipeline " <<
             static_cast<void *>(this) << " flow " << description_ << ": " <<
-            (rtcp ? "rtcp" : "rtp"));
+            ToString(info.type_));
+
+  // TODO(bcampen@mozilla.com): Should we disconnect from the flow on failure?
+  nsresult res;
 
   // Now instantiate the SRTP objects
   TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
-      flow->GetLayer(TransportLayerDtls::ID()));
+      info.transport_->GetLayer(TransportLayerDtls::ID()));
   MOZ_ASSERT(dtls);  // DTLS is mandatory
 
   uint16_t cipher_suite;
   res = dtls->GetSrtpCipher(&cipher_suite);
   if (NS_FAILED(res)) {
     MOZ_MTLOG(ML_ERROR, "Failed to negotiate DTLS-SRTP. This is an error");
-    *state = MP_CLOSED;
+    info.state_ = MP_CLOSED;
+    UpdateRtcpMuxState(info);
     return res;
   }
 
   // SRTP Key Exporter as per RFC 5764 S 4.2
   unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2];
   res = dtls->ExportKeyingMaterial(kDTLSExporterLabel, false, "",
                                    srtp_block, sizeof(srtp_block));
   if (NS_FAILED(res)) {
     MOZ_MTLOG(ML_ERROR, "Failed to compute DTLS-SRTP keys. This is an error");
-    *state = MP_CLOSED;
+    info.state_ = MP_CLOSED;
+    UpdateRtcpMuxState(info);
     MOZ_CRASH();  // TODO: Remove once we have enough field experience to
                   // know it doesn't happen. bug 798797. Note that the
                   // code after this never executes.
     return res;
   }
 
   // Slice and dice as per RFC 5764 S 4.2
   unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH];
@@ -213,106 +228,99 @@ nsresult MediaPipeline::TransportReady_s
   if (dtls->role() == TransportLayerDtls::CLIENT) {
     write_key = client_write_key;
     read_key = server_write_key;
   } else {
     write_key = server_write_key;
     read_key = client_write_key;
   }
 
-  if (!rtcp) {
-    // RTP side
-    MOZ_ASSERT(!rtp_send_srtp_ && !rtp_recv_srtp_);
-    rtp_send_srtp_ = SrtpFlow::Create(cipher_suite, false,
-                                      write_key, SRTP_TOTAL_KEY_LENGTH);
-    rtp_recv_srtp_ = SrtpFlow::Create(cipher_suite, true,
-                                      read_key, SRTP_TOTAL_KEY_LENGTH);
-    if (!rtp_send_srtp_ || !rtp_recv_srtp_) {
-      MOZ_MTLOG(ML_ERROR, "Couldn't create SRTP flow for RTCP");
-      *state = MP_CLOSED;
-      return NS_ERROR_FAILURE;
-    }
+  MOZ_ASSERT(!info.send_srtp_ && !info.recv_srtp_);
+  info.send_srtp_ = SrtpFlow::Create(cipher_suite, false, write_key,
+                                     SRTP_TOTAL_KEY_LENGTH);
+  info.recv_srtp_ = SrtpFlow::Create(cipher_suite, true, read_key,
+                                     SRTP_TOTAL_KEY_LENGTH);
+  if (!info.send_srtp_ || !info.recv_srtp_) {
+    MOZ_MTLOG(ML_ERROR, "Couldn't create SRTP flow for "
+              << ToString(info.type_));
+    info.state_ = MP_CLOSED;
+    UpdateRtcpMuxState(info);
+    return NS_ERROR_FAILURE;
+  }
 
-    // Start listening
-    // If rtcp_transport_ is the same as rtp_transport_ then we are muxing
-    if (rtcp_transport_ == rtp_transport_) {
-      MOZ_ASSERT(!rtcp_send_srtp_ && !rtcp_recv_srtp_);
-      rtcp_send_srtp_ = rtp_send_srtp_;
-      rtcp_recv_srtp_ = rtp_recv_srtp_;
-
-      MOZ_MTLOG(ML_INFO, "Listening for packets received on " <<
-                static_cast<void *>(dtls->downward()));
+  if (direction_ == RECEIVE) {
+    // The TRANSMIT pipeline does not process _any_ RTCP. This is the RECEIVE
+    // pipeline's job, even for receiver reports.
+    MOZ_MTLOG(ML_INFO, "Listening for " << ToString(info.type_)
+                       << " packets received on " <<
+                       static_cast<void *>(dtls->downward()));
 
-      dtls->downward()->SignalPacketReceived.connect(this,
-                                                     &MediaPipeline::
-                                                     PacketReceived);
-      rtcp_state_ = MP_OPEN;
-    } else {
-      MOZ_MTLOG(ML_INFO, "Listening for RTP packets received on " <<
-                static_cast<void *>(dtls->downward()));
-
-      dtls->downward()->SignalPacketReceived.connect(this,
-                                                     &MediaPipeline::
-                                                     RtpPacketReceived);
+    switch (info.type_) {
+      case RTP:
+        dtls->downward()->SignalPacketReceived.connect(
+            this,
+            &MediaPipeline::RtpPacketReceived);
+        break;
+      case RTCP:
+        dtls->downward()->SignalPacketReceived.connect(
+            this,
+            &MediaPipeline::RtcpPacketReceived);
+        break;
+      case MUX:
+        dtls->downward()->SignalPacketReceived.connect(
+            this,
+            &MediaPipeline::PacketReceived);
+        break;
+      default:
+        MOZ_CRASH();
     }
   }
-  else {
-    MOZ_ASSERT(!rtcp_send_srtp_ && !rtcp_recv_srtp_);
-    rtcp_send_srtp_ = SrtpFlow::Create(cipher_suite, false,
-                                       write_key, SRTP_TOTAL_KEY_LENGTH);
-    rtcp_recv_srtp_ = SrtpFlow::Create(cipher_suite, true,
-                                       read_key, SRTP_TOTAL_KEY_LENGTH);
-    if (!rtcp_send_srtp_ || !rtcp_recv_srtp_) {
-      MOZ_MTLOG(ML_ERROR, "Couldn't create SRTCP flow for RTCP");
-      *state = MP_CLOSED;
-      return NS_ERROR_FAILURE;
-    }
 
-    MOZ_MTLOG(ML_DEBUG, "Listening for RTCP packets received on " <<
-              static_cast<void *>(dtls->downward()));
-
-    // Start listening
-    dtls->downward()->SignalPacketReceived.connect(this,
-                                                  &MediaPipeline::
-                                                  RtcpPacketReceived);
-  }
-
-  *state = MP_OPEN;
+  info.state_ = MP_OPEN;
+  UpdateRtcpMuxState(info);
   return NS_OK;
 }
 
-nsresult MediaPipeline::TransportFailed_s(TransportFlow *flow) {
+nsresult MediaPipeline::TransportFailed_s(TransportInfo &info) {
   ASSERT_ON_THREAD(sts_thread_);
-  bool rtcp = !(flow == rtp_transport_);
-
-  State *state = rtcp ? &rtcp_state_ : &rtp_state_;
-
-  *state = MP_CLOSED;
 
-  // If rtcp_transport_ is the same as rtp_transport_ then we are muxing
-  if(rtcp_transport_ == rtp_transport_) {
-    MOZ_ASSERT(state != &rtcp_state_);
-    rtcp_state_ = MP_CLOSED;
-  }
+  info.state_ = MP_CLOSED;
+  UpdateRtcpMuxState(info);
 
-
-  MOZ_MTLOG(ML_INFO, "Transport closed for flow " << (rtcp ? "rtcp" : "rtp"));
+  MOZ_MTLOG(ML_INFO, "Transport closed for flow " << ToString(info.type_));
 
   NS_WARNING(
       "MediaPipeline Transport failed. This is not properly cleaned up yet");
 
-
   // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
   // connection was good and now it is bad.
   // TODO(ekr@rtfm.com): Report up so that the PC knows we
   // have experienced an error.
 
   return NS_OK;
 }
 
+void MediaPipeline::UpdateRtcpMuxState(TransportInfo &info) {
+  if (info.type_ == MUX) {
+    if (info.transport_ == rtcp_.transport_) {
+      rtcp_.state_ = info.state_;
+      if (!rtcp_.send_srtp_) {
+        rtcp_.send_srtp_ = info.send_srtp_;
+        rtcp_.recv_srtp_ = info.recv_srtp_;
+      }
+    } else if (possible_bundle_rtcp_ &&
+               info.transport_ == possible_bundle_rtcp_->transport_) {
+      possible_bundle_rtcp_->state_ = info.state_;
+      if (!possible_bundle_rtcp_->send_srtp_) {
+        possible_bundle_rtcp_->send_srtp_ = info.send_srtp_;
+        possible_bundle_rtcp_->recv_srtp_ = info.recv_srtp_;
+      }
+    }
+  }
+}
 
 nsresult MediaPipeline::SendPacket(TransportFlow *flow, const void *data,
                                    int len) {
   ASSERT_ON_THREAD(sts_thread_);
 
   // Note that we bypass the DTLS layer here
   TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
       flow->GetLayer(TransportLayerDtls::ID()));
@@ -335,50 +343,50 @@ nsresult MediaPipeline::SendPacket(Trans
 
 void MediaPipeline::increment_rtp_packets_sent(int32_t bytes) {
   ++rtp_packets_sent_;
   rtp_bytes_sent_ += bytes;
 
   if (!(rtp_packets_sent_ % 100)) {
     MOZ_MTLOG(ML_INFO, "RTP sent packet count for " << description_
               << " Pipeline " << static_cast<void *>(this)
-              << " Flow : " << static_cast<void *>(rtp_transport_)
+              << " Flow : " << static_cast<void *>(rtp_.transport_)
               << ": " << rtp_packets_sent_
               << " (" << rtp_bytes_sent_ << " bytes)");
   }
 }
 
 void MediaPipeline::increment_rtcp_packets_sent() {
   ++rtcp_packets_sent_;
   if (!(rtcp_packets_sent_ % 100)) {
     MOZ_MTLOG(ML_INFO, "RTCP sent packet count for " << description_
               << " Pipeline " << static_cast<void *>(this)
-              << " Flow : " << static_cast<void *>(rtcp_transport_)
+              << " Flow : " << static_cast<void *>(rtcp_.transport_)
               << ": " << rtcp_packets_sent_);
   }
 }
 
 void MediaPipeline::increment_rtp_packets_received(int32_t bytes) {
   ++rtp_packets_received_;
   rtp_bytes_received_ += bytes;
   if (!(rtp_packets_received_ % 100)) {
     MOZ_MTLOG(ML_INFO, "RTP received packet count for " << description_
               << " Pipeline " << static_cast<void *>(this)
-              << " Flow : " << static_cast<void *>(rtp_transport_)
+              << " Flow : " << static_cast<void *>(rtp_.transport_)
               << ": " << rtp_packets_received_
               << " (" << rtp_bytes_received_ << " bytes)");
   }
 }
 
 void MediaPipeline::increment_rtcp_packets_received() {
   ++rtcp_packets_received_;
   if (!(rtcp_packets_received_ % 100)) {
     MOZ_MTLOG(ML_INFO, "RTCP received packet count for " << description_
               << " Pipeline " << static_cast<void *>(this)
-              << " Flow : " << static_cast<void *>(rtcp_transport_)
+              << " Flow : " << static_cast<void *>(rtcp_.transport_)
               << ": " << rtcp_packets_received_);
   }
 }
 
 void MediaPipeline::RtpPacketReceived(TransportLayer *layer,
                                       const unsigned char *data,
                                       size_t len) {
   if (!transport_->pipeline()) {
@@ -386,44 +394,94 @@ void MediaPipeline::RtpPacketReceived(Tr
     return;
   }
 
   if (!conduit_) {
     MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected");
     return;
   }
 
-  if (rtp_state_ != MP_OPEN) {
+  TransportInfo* info = &rtp_;
+
+  if (possible_bundle_rtp_ &&
+      possible_bundle_rtp_->transport_->Contains(layer)) {
+    // Received this on our possible bundle transport. Override info.
+    info = possible_bundle_rtp_;
+  }
+
+  // TODO(bcampen@mozilla.com): Can either of these actually happen? If not,
+  // the info variable can be removed, and this function gets simpler.
+  if (info->state_ != MP_OPEN) {
     MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; pipeline not open");
     return;
   }
 
-  if (rtp_transport_->state() != TransportLayer::TS_OPEN) {
+  if (info->transport_->state() != TransportLayer::TS_OPEN) {
     MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open");
     return;
   }
 
-  MOZ_ASSERT(rtp_recv_srtp_);  // This should never happen
+  // This should never happen.
+  MOZ_ASSERT(info->recv_srtp_);
 
   if (direction_ == TRANSMIT) {
-    // Discard any media that is being transmitted to us
-    // This will be unnecessary when we have SSRC filtering.
     return;
   }
 
-  // TODO(ekr@rtfm.com): filter for DTLS here and in RtcpPacketReceived
-  // TODO(ekr@rtfm.com): filter on SSRC for bundle
+  if (possible_bundle_rtp_ && (info == &rtp_))  {
+    // We were not sure we would be using rtp_ or possible_bundle_rtp_, but we
+    // have just received traffic that clears this up.
+    // Don't let our filter prevent us from noticing this, if the filter is
+    // incomplete (ie; no SSRCs in remote SDP, or no remote SDP at all).
+    SetUsingBundle_s(false);
+    MOZ_MTLOG(ML_INFO, "Ruled out the possibility that we're receiving bundle "
+                       "for " << description_);
+    // TODO(bcampen@mozilla.com): Might be nice to detect when every
+    // MediaPipeline but the master has determined that it isn't doing bundle,
+    // since that means the master isn't doing bundle either. We could maybe
+    // do this by putting some refcounted dummy variable in the filters, and
+    // checking the value of the refcount. It is not clear whether this is
+    // going to be useful in practice.
+  }
+
+  if (!len) {
+    return;
+  }
+
+  // Filter out everything but RTP/RTCP
+  if (data[0] < 128 || data[0] > 191) {
+    return;
+  }
+
+  if (filter_) {
+    webrtc::RTPHeader header;
+    if (!rtp_parser_->Parse(data, len, &header) ||
+        !filter_->Filter(header)) {
+      return;
+    }
+  }
+
+  if (possible_bundle_rtp_) {
+    // Just got traffic that passed our filter on the potential bundle
+    // transport. Must be doing bundle.
+    SetUsingBundle_s(true);
+    MOZ_MTLOG(ML_INFO, "Confirmed the possibility that we're receiving bundle");
+  }
+
+  // Everything is decided now; just use rtp_
+  MOZ_ASSERT(!possible_bundle_rtp_);
+  MOZ_ASSERT(!possible_bundle_rtcp_);
 
   // Make a copy rather than cast away constness
   ScopedDeletePtr<unsigned char> inner_data(
       new unsigned char[len]);
   memcpy(inner_data, data, len);
   int out_len = 0;
-  nsresult res = rtp_recv_srtp_->UnprotectRtp(inner_data,
-                                              len, len, &out_len);
+  nsresult res = rtp_.recv_srtp_->UnprotectRtp(inner_data,
+                                               len, len, &out_len);
   if (!NS_SUCCEEDED(res)) {
     char tmp[16];
 
     PR_snprintf(tmp, sizeof(tmp), "%.2x %.2x %.2x %.2x",
                 inner_data[0],
                 inner_data[1],
                 inner_data[2],
                 inner_data[3]);
@@ -446,43 +504,79 @@ void MediaPipeline::RtcpPacketReceived(T
     return;
   }
 
   if (!conduit_) {
     MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; media disconnected");
     return;
   }
 
-  if (rtcp_state_ != MP_OPEN) {
+  TransportInfo* info = &rtcp_;
+  if (possible_bundle_rtcp_ &&
+      possible_bundle_rtcp_->transport_->Contains(layer)) {
+    info = possible_bundle_rtcp_;
+  }
+
+  if (info->state_ != MP_OPEN) {
     MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; pipeline not open");
     return;
   }
 
-  if (rtcp_transport_->state() != TransportLayer::TS_OPEN) {
+  if (info->transport_->state() != TransportLayer::TS_OPEN) {
     MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport not open");
     return;
   }
 
-  if (direction_ == RECEIVE) {
-    // Discard any RTCP that is being transmitted to us
-    // This will be unnecessary when we have SSRC filtering.
+  if (possible_bundle_rtp_ && (info == &rtcp_)) {
+    // We have offered bundle, and received our first packet on a non-bundle
+    // address. We are definitely not using the bundle address.
+    SetUsingBundle_s(false);
+  }
+
+  if (!len) {
+    return;
+  }
+
+  // Filter out everything but RTP/RTCP
+  if (data[0] < 128 || data[0] > 191) {
     return;
   }
 
+  MediaPipelineFilter::Result filter_result = MediaPipelineFilter::PASS;
+  if (filter_) {
+    filter_result = filter_->FilterRTCP(data, len);
+    if (filter_result == MediaPipelineFilter::FAIL) {
+      return;
+    }
+  }
+
+  if (filter_result == MediaPipelineFilter::PASS && possible_bundle_rtp_) {
+    // Just got traffic that passed our filter on the potential bundle
+    // transport. Must be doing bundle.
+    SetUsingBundle_s(true);
+  }
+
+  // We continue using info here, since it is possible that the filter did not
+  // support the payload type (ie; returned MediaPipelineFilter::UNSUPPORTED).
+  // In this case, we just let it pass, and hope the webrtc.org code does
+  // something sane.
   increment_rtcp_packets_received();
 
-  MOZ_ASSERT(rtcp_recv_srtp_);  // This should never happen
+  MOZ_ASSERT(info->recv_srtp_);  // This should never happen
 
   // Make a copy rather than cast away constness
   ScopedDeletePtr<unsigned char> inner_data(
       new unsigned char[len]);
   memcpy(inner_data, data, len);
   int out_len;
 
-  nsresult res = rtcp_recv_srtp_->UnprotectRtcp(inner_data, len, len, &out_len);
+  nsresult res = info->recv_srtp_->UnprotectRtcp(inner_data,
+                                                 len,
+                                                 len,
+                                                 &out_len);
 
   if (!NS_SUCCEEDED(res))
     return;
 
   (void)conduit_->ReceivedRTCPPacket(inner_data, out_len);  // Ignore error codes
 }
 
 bool MediaPipeline::IsRtp(const unsigned char *data, size_t len) {
@@ -558,28 +652,146 @@ nsresult MediaPipelineTransmit::Init() {
   // unqueued (and not resampled) data
   if (domstream_->AddDirectListener(listener_)) {
     listener_->direct_connect_ = true;
   }
 
   return MediaPipeline::Init();
 }
 
-nsresult MediaPipelineTransmit::TransportReady_s(TransportFlow *flow) {
+nsresult MediaPipelineTransmit::TransportReady_s(TransportInfo &info) {
+  ASSERT_ON_THREAD(sts_thread_);
   // Call base ready function.
-  MediaPipeline::TransportReady_s(flow);
+  MediaPipeline::TransportReady_s(info);
 
-  if (flow == rtp_transport_) {
+  // Should not be set for a transmitter
+  MOZ_ASSERT(!possible_bundle_rtp_);
+  if (&info == &rtp_) {
     // TODO(ekr@rtfm.com): Move onto MSG thread.
     listener_->SetActive(true);
   }
 
   return NS_OK;
 }
 
+void MediaPipeline::DisconnectTransport_s(TransportInfo &info) {
+  MOZ_ASSERT(info.transport_);
+  ASSERT_ON_THREAD(sts_thread_);
+
+  info.transport_->SignalStateChange.disconnect(this);
+  // We do this even if we're a transmitter, since we are still possibly
+  // registered to receive RTCP.
+  TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
+      info.transport_->GetLayer(TransportLayerDtls::ID()));
+  MOZ_ASSERT(dtls);  // DTLS is mandatory
+  MOZ_ASSERT(dtls->downward());
+  dtls->downward()->SignalPacketReceived.disconnect(this);
+}
+
+nsresult MediaPipeline::ConnectTransport_s(TransportInfo &info) {
+  MOZ_ASSERT(info.transport_);
+  ASSERT_ON_THREAD(sts_thread_);
+
+  // Look to see if the transport is ready
+  if (info.transport_->state() == TransportLayer::TS_OPEN) {
+    nsresult res = TransportReady_s(info);
+    if (NS_FAILED(res)) {
+      MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res="
+                << static_cast<uint32_t>(res) << " in " << __FUNCTION__);
+      return res;
+    }
+  } else if (info.transport_->state() == TransportLayer::TS_ERROR) {
+    MOZ_MTLOG(ML_ERROR, ToString(info.type_)
+                        << "transport is already in error state");
+    TransportFailed_s(info);
+    return NS_ERROR_FAILURE;
+  }
+
+  info.transport_->SignalStateChange.connect(this,
+                                             &MediaPipeline::StateChange);
+
+  return NS_OK;
+}
+
+MediaPipeline::TransportInfo* MediaPipeline::GetTransportInfo_s(
+    TransportFlow *flow) {
+  ASSERT_ON_THREAD(sts_thread_);
+  if (flow == rtp_.transport_) {
+    return &rtp_;
+  }
+
+  if (flow == rtcp_.transport_) {
+    return &rtcp_;
+  }
+
+  if (possible_bundle_rtp_) {
+    if (flow == possible_bundle_rtp_->transport_) {
+      return possible_bundle_rtp_;
+    }
+
+    if (flow == possible_bundle_rtcp_->transport_) {
+      return possible_bundle_rtcp_;
+    }
+  }
+
+  return nullptr;
+}
+
+void MediaPipeline::SetUsingBundle_s(bool decision) {
+  ASSERT_ON_THREAD(sts_thread_);
+  // Note: This can be called either because of events on the STS thread, or
+  // by events on the main thread (ie; receiving a remote description). It is
+  // best to be careful of races here, so don't assume that transports are open.
+  if (!possible_bundle_rtp_) {
+    if (!decision) {
+      // This can happen on the master pipeline.
+      filter_ = nullptr;
+    }
+    return;
+  }
+
+  if (direction_ == RECEIVE) {
+    if (decision) {
+      MOZ_MTLOG(ML_INFO, "Non-master pipeline confirmed bundle for "
+                         << description_);
+      // We're doing bundle. Release the unused flows, and copy the ones we
+      // are using into the less wishy-washy members.
+      DisconnectTransport_s(rtp_);
+      DisconnectTransport_s(rtcp_);
+      rtp_ = *possible_bundle_rtp_;
+      rtcp_ = *possible_bundle_rtcp_;
+    } else {
+      MOZ_MTLOG(ML_INFO, "Non-master pipeline confirmed no bundle for "
+                         << description_);
+      // We are not doing bundle
+      DisconnectTransport_s(*possible_bundle_rtp_);
+      DisconnectTransport_s(*possible_bundle_rtcp_);
+      filter_ = nullptr;
+    }
+
+    // We are no longer in an ambiguous state.
+    possible_bundle_rtp_ = nullptr;
+    possible_bundle_rtcp_ = nullptr;
+  }
+}
+
+void MediaPipeline::UpdateFilterFromRemoteDescription_s(
+    nsAutoPtr<MediaPipelineFilter> filter) {
+  ASSERT_ON_THREAD(sts_thread_);
+  // This is only supposed to relax the filter. Relaxing a missing filter is
+  // not possible.
+  MOZ_ASSERT(filter_);
+
+  if (!filter) {
+    filter_ = nullptr;
+  } else {
+    filter_->IncorporateRemoteDescription(*filter);
+  }
+}
+
 nsresult MediaPipeline::PipelineTransport::SendRtpPacket(
     const void *data, int len) {
 
     nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t *>(data),
                                              len));
 
     RUN_ON_THREAD(sts_thread_,
                   WrapRunnable(
@@ -588,46 +800,47 @@ nsresult MediaPipeline::PipelineTranspor
                       buf),
                   NS_DISPATCH_NORMAL);
 
     return NS_OK;
 }
 
 nsresult MediaPipeline::PipelineTransport::SendRtpPacket_s(
     nsAutoPtr<DataBuffer> data) {
+  ASSERT_ON_THREAD(sts_thread_);
   if (!pipeline_)
     return NS_OK;  // Detached
 
-  if (!pipeline_->rtp_send_srtp_) {
+  if (!pipeline_->rtp_.send_srtp_) {
     MOZ_MTLOG(ML_DEBUG, "Couldn't write RTP packet; SRTP not set up yet");
     return NS_OK;
   }
 
-  MOZ_ASSERT(pipeline_->rtp_transport_);
-  NS_ENSURE_TRUE(pipeline_->rtp_transport_, NS_ERROR_NULL_POINTER);
+  MOZ_ASSERT(pipeline_->rtp_.transport_);
+  NS_ENSURE_TRUE(pipeline_->rtp_.transport_, NS_ERROR_NULL_POINTER);
 
   // libsrtp enciphers in place, so we need a new, big enough
   // buffer.
   // XXX. allocates and deletes one buffer per packet sent.
   // Bug 822129
   int max_len = data->len() + SRTP_MAX_EXPANSION;
   ScopedDeletePtr<unsigned char> inner_data(
       new unsigned char[max_len]);
   memcpy(inner_data, data->data(), data->len());
 
   int out_len;
-  nsresult res = pipeline_->rtp_send_srtp_->ProtectRtp(inner_data,
-                                                       data->len(),
-                                                       max_len,
-                                                       &out_len);
+  nsresult res = pipeline_->rtp_.send_srtp_->ProtectRtp(inner_data,
+                                                        data->len(),
+                                                        max_len,
+                                                        &out_len);
   if (!NS_SUCCEEDED(res))
     return res;
 
   pipeline_->increment_rtp_packets_sent(out_len);
-  return pipeline_->SendPacket(pipeline_->rtp_transport_, inner_data,
+  return pipeline_->SendPacket(pipeline_->rtp_.transport_, inner_data,
                                out_len);
 }
 
 nsresult MediaPipeline::PipelineTransport::SendRtcpPacket(
     const void *data, int len) {
 
     nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t *>(data),
                                              len));
@@ -639,46 +852,48 @@ nsresult MediaPipeline::PipelineTranspor
                       buf),
                   NS_DISPATCH_NORMAL);
 
     return NS_OK;
 }
 
 nsresult MediaPipeline::PipelineTransport::SendRtcpPacket_s(
     nsAutoPtr<DataBuffer> data) {
+  ASSERT_ON_THREAD(sts_thread_);
   if (!pipeline_)
     return NS_OK;  // Detached
 
-  if (!pipeline_->rtcp_send_srtp_) {
+  if (!pipeline_->rtcp_.send_srtp_) {
     MOZ_MTLOG(ML_DEBUG, "Couldn't write RTCP packet; SRTCP not set up yet");
     return NS_OK;
   }
 
-  MOZ_ASSERT(pipeline_->rtcp_transport_);
-  NS_ENSURE_TRUE(pipeline_->rtcp_transport_, NS_ERROR_NULL_POINTER);
+  MOZ_ASSERT(pipeline_->rtcp_.transport_);
+  NS_ENSURE_TRUE(pipeline_->rtcp_.transport_, NS_ERROR_NULL_POINTER);
 
   // libsrtp enciphers in place, so we need a new, big enough
   // buffer.
   // XXX. allocates and deletes one buffer per packet sent.
   // Bug 822129.
   int max_len = data->len() + SRTP_MAX_EXPANSION;
   ScopedDeletePtr<unsigned char> inner_data(
       new unsigned char[max_len]);
   memcpy(inner_data, data->data(), data->len());
 
   int out_len;
-  nsresult res = pipeline_->rtcp_send_srtp_->ProtectRtcp(inner_data,
-                                                         data->len(),
-                                                         max_len,
-                                                         &out_len);
+  nsresult res = pipeline_->rtcp_.send_srtp_->ProtectRtcp(inner_data,
+                                                          data->len(),
+                                                          max_len,
+                                                          &out_len);
+
   if (!NS_SUCCEEDED(res))
     return res;
 
   pipeline_->increment_rtcp_packets_sent();
-  return pipeline_->SendPacket(pipeline_->rtcp_transport_, inner_data,
+  return pipeline_->SendPacket(pipeline_->rtcp_.transport_, inner_data,
                                out_len);
 }
 
 // Called if we're attached with AddDirectListener()
 void MediaPipelineTransmit::PipelineListener::
 NotifyRealtimeData(MediaStreamGraph* graph, TrackID tid,
                    TrackRate rate,
                    TrackTicks offset,
--- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
+++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h
@@ -13,27 +13,30 @@
 #ifdef USE_FAKE_MEDIA_STREAMS
 #include "FakeMediaStreams.h"
 #else
 #include "DOMMediaStream.h"
 #include "MediaStreamGraph.h"
 #include "VideoUtils.h"
 #endif
 #include "MediaConduitInterface.h"
+#include "MediaPipelineFilter.h"
 #include "AudioSegment.h"
 #include "mozilla/ReentrantMonitor.h"
 #include "SrtpFlow.h"
 #include "databuffer.h"
 #include "runnable_utils.h"
 #include "transportflow.h"
 
 #ifdef MOZILLA_INTERNAL_API
 #include "VideoSegment.h"
 #endif
 
+#include "webrtc/modules/rtp_rtcp/interface/rtp_header_parser.h"
+
 namespace mozilla {
 
 // A class that represents the pipeline of audio and video
 // The dataflow looks like:
 //
 // TRANSMIT
 // CaptureDevice -> stream -> [us] -> conduit -> [us] -> transport -> network
 //
@@ -75,42 +78,34 @@ class MediaPipeline : public sigslot::ha
                 RefPtr<MediaSessionConduit> conduit,
                 RefPtr<TransportFlow> rtp_transport,
                 RefPtr<TransportFlow> rtcp_transport)
       : direction_(direction),
         stream_(stream),
         track_id_(track_id),
         level_(level),
         conduit_(conduit),
-        rtp_transport_(rtp_transport),
-        rtp_state_(MP_CONNECTING),
-        rtcp_transport_(rtcp_transport),
-        rtcp_state_(MP_CONNECTING),
+        rtp_(rtp_transport, rtcp_transport ? RTP : MUX),
+        rtcp_(rtcp_transport ? rtcp_transport : rtp_transport,
+              rtcp_transport ? RTCP : MUX),
         main_thread_(main_thread),
         sts_thread_(sts_thread),
-        rtp_send_srtp_(),
-        rtcp_send_srtp_(),
-        rtp_recv_srtp_(),
-        rtcp_recv_srtp_(),
         rtp_packets_sent_(0),
         rtcp_packets_sent_(0),
         rtp_packets_received_(0),
         rtcp_packets_received_(0),
         rtp_bytes_sent_(0),
         rtp_bytes_received_(0),
         pc_(pc),
         description_() {
       // To indicate rtcp-mux rtcp_transport should be nullptr.
       // Therefore it's an error to send in the same flow for
       // both rtp and rtcp.
-      MOZ_ASSERT(rtp_transport_ != rtcp_transport_);
+      MOZ_ASSERT(rtp_transport != rtcp_transport);
 
-      if (!rtcp_transport_) {
-        rtcp_transport_ = rtp_transport;
-      }
       // PipelineTransport() will access this->sts_thread_; moved here for safety
       transport_ = new PipelineTransport(this);
   }
 
   virtual ~MediaPipeline();
 
   // Must be called on the STS thread.  Must be called after ShutdownMedia_m().
   void ShutdownTransport_s();
@@ -121,44 +116,61 @@ class MediaPipeline : public sigslot::ha
 
     if (stream_) {
       DetachMediaStream();
     }
   }
 
   virtual nsresult Init();
 
+  // When we have offered bundle, the MediaPipelines are created in an
+  // indeterminate state; we do not know whether the answerer will take us
+  // up on our offer. In the meantime, we need to behave in a manner that
+  // errs on the side of packet loss when it is unclear whether an arriving
+  // packet is meant for us. We want to get out of this indeterminate state
+  // ASAP, which is what this function can be used for.
+  void SetUsingBundle_s(bool decision);
+  void UpdateFilterFromRemoteDescription_s(
+      nsAutoPtr<MediaPipelineFilter> filter);
+
   virtual Direction direction() const { return direction_; }
   virtual TrackID trackid() const { return track_id_; }
   virtual int level() const { return level_; }
 
   bool IsDoingRtcpMux() const {
-    return (rtp_transport_ == rtcp_transport_);
+    return (rtp_.type_ == MUX);
   }
 
   int32_t rtp_packets_sent() const { return rtp_packets_sent_; }
   int64_t rtp_bytes_sent() const { return rtp_bytes_sent_; }
   int32_t rtcp_packets_sent() const { return rtcp_packets_sent_; }
   int32_t rtp_packets_received() const { return rtp_packets_received_; }
   int64_t rtp_bytes_received() const { return rtp_bytes_received_; }
   int32_t rtcp_packets_received() const { return rtcp_packets_received_; }
 
   MediaSessionConduit *Conduit() const { return conduit_; }
 
   // Thread counting
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaPipeline)
 
+  typedef enum {
+    RTP,
+    RTCP,
+    MUX,
+    MAX_RTP_TYPE
+  } RtpType;
+
  protected:
   virtual void DetachMediaStream() {}
 
   // Separate class to allow ref counting
   class PipelineTransport : public TransportInterface {
    public:
     // Implement the TransportInterface functions
-    PipelineTransport(MediaPipeline *pipeline)
+    explicit PipelineTransport(MediaPipeline *pipeline)
         : pipeline_(pipeline),
           sts_thread_(pipeline->sts_thread_) {}
 
     void Detach() { pipeline_ = nullptr; }
     MediaPipeline *pipeline() const { return pipeline_; }
 
     virtual nsresult SendRtpPacket(const void* data, int len);
     virtual nsresult SendRtcpPacket(const void* data, int len);
@@ -167,25 +179,50 @@ class MediaPipeline : public sigslot::ha
     virtual nsresult SendRtpPacket_s(nsAutoPtr<DataBuffer> data);
     virtual nsresult SendRtcpPacket_s(nsAutoPtr<DataBuffer> data);
 
     MediaPipeline *pipeline_;  // Raw pointer to avoid cycles
     nsCOMPtr<nsIEventTarget> sts_thread_;
   };
   friend class PipelineTransport;
 
-  virtual nsresult TransportFailed_s(TransportFlow *flow);  // The transport is down
-  virtual nsresult TransportReady_s(TransportFlow *flow);   // The transport is ready
+  class TransportInfo {
+    public:
+      TransportInfo(RefPtr<TransportFlow> flow, RtpType type) :
+        transport_(flow),
+        state_(MP_CONNECTING),
+        type_(type) {
+        MOZ_ASSERT(flow);
+      }
+
+      RefPtr<TransportFlow> transport_;
+      State state_;
+      RefPtr<SrtpFlow> send_srtp_;
+      RefPtr<SrtpFlow> recv_srtp_;
+      RtpType type_;
+  };
+
+  // The transport is down
+  virtual nsresult TransportFailed_s(TransportInfo &info);
+  // The transport is ready
+  virtual nsresult TransportReady_s(TransportInfo &info);
+  void UpdateRtcpMuxState(TransportInfo &info);
+
+  // Unhooks from signals
+  void DisconnectTransport_s(TransportInfo &info);
+  nsresult ConnectTransport_s(TransportInfo &info);
+
+  TransportInfo* GetTransportInfo_s(TransportFlow *flow);
 
   void increment_rtp_packets_sent(int bytes);
   void increment_rtcp_packets_sent();
   void increment_rtp_packets_received(int bytes);
   void increment_rtcp_packets_received();
 
-  virtual nsresult SendPacket(TransportFlow *flow, const void* data, int len);
+  virtual nsresult SendPacket(TransportFlow *flow, const void *data, int len);
 
   // Process slots on transports
   void StateChange(TransportFlow *flow, TransportLayer::State);
   void RtpPacketReceived(TransportLayer *layer, const unsigned char *data,
                          size_t len);
   void RtcpPacketReceived(TransportLayer *layer, const unsigned char *data,
                           size_t len);
   void PacketReceived(TransportLayer *layer, const unsigned char *data,
@@ -197,50 +234,55 @@ class MediaPipeline : public sigslot::ha
                                 // Used on STS and MediaStreamGraph threads.
   TrackID track_id_;            // The track on the stream.
                                 // Written and used as the stream_;
   int level_; // The m-line index (starting at 1, to match convention)
   RefPtr<MediaSessionConduit> conduit_;  // Our conduit. Written on the main
                                          // thread. Read on STS thread.
 
   // The transport objects. Read/written on STS thread.
-  RefPtr<TransportFlow> rtp_transport_;
-  State rtp_state_;
-  RefPtr<TransportFlow> rtcp_transport_;
-  State rtcp_state_;
+  TransportInfo rtp_;
+  TransportInfo rtcp_;
+  // These are for bundle. We have a separate set because when we have offered
+  // bundle, we do not know whether we will receive traffic on the transport
+  // in this pipeline's m-line, or the transport in the "master" m-line for
+  // the bundle. We need to be ready for either. Once this ambiguity is
+  // resolved, the transport we know that we'll be using will be set in
+  // rtp_transport_ and rtcp_transport_, and these will be unset.
+  // TODO(bcampen@mozilla.com): I'm pretty sure this could be leveraged for
+  // re-offer with a new address on an m-line too, with a little work.
+  nsAutoPtr<TransportInfo> possible_bundle_rtp_;
+  nsAutoPtr<TransportInfo> possible_bundle_rtcp_;
 
   // Pointers to the threads we need. Initialized at creation
   // and used all over the place.
   nsCOMPtr<nsIEventTarget> main_thread_;
   nsCOMPtr<nsIEventTarget> sts_thread_;
 
   // Created on Init. Referenced by the conduit and eventually
   // destroyed on the STS thread.
   RefPtr<PipelineTransport> transport_;
 
-  // Used only on STS thread.
-  RefPtr<SrtpFlow> rtp_send_srtp_;
-  RefPtr<SrtpFlow> rtcp_send_srtp_;
-  RefPtr<SrtpFlow> rtp_recv_srtp_;
-  RefPtr<SrtpFlow> rtcp_recv_srtp_;
-
-  // Written only on STS thread. May be read on other
-  // threads but since there is no mutex, the values
-  // will only be approximate.
+  // Only safe to access from STS thread.
+  // Build into TransportInfo?
   int32_t rtp_packets_sent_;
   int32_t rtcp_packets_sent_;
   int32_t rtp_packets_received_;
   int32_t rtcp_packets_received_;
   int64_t rtp_bytes_sent_;
   int64_t rtp_bytes_received_;
 
   // Written on Init. Read on STS thread.
   std::string pc_;
   std::string description_;
 
+  // Written on Init, all following accesses are on the STS thread.
+  nsAutoPtr<MediaPipelineFilter> filter_;
+  nsAutoPtr<webrtc::RtpHeaderParser> rtp_parser_;
+
  private:
   nsresult Init_s();
 
   bool IsRtp(const unsigned char *data, size_t len);
 };
 
 class GenericReceiveListener : public MediaStreamListener
 {
@@ -339,17 +381,17 @@ class MediaPipelineTransmit : public Med
     domstream_->RemoveDirectListener(listener_);
     domstream_ = nullptr;
     stream_->RemoveListener(listener_);
     // Let the listener be destroyed with the pipeline (or later).
     stream_ = nullptr;
   }
 
   // Override MediaPipeline::TransportReady.
-  virtual nsresult TransportReady_s(TransportFlow *flow);
+  virtual nsresult TransportReady_s(TransportInfo &info);
 
   // Separate class to allow ref counting
   class PipelineListener : public MediaStreamDirectListener {
    friend class MediaPipelineTransmit;
    public:
     PipelineListener(const RefPtr<MediaSessionConduit>& conduit)
       : conduit_(conduit),
         active_(false),
@@ -440,21 +482,36 @@ class MediaPipelineReceive : public Medi
   MediaPipelineReceive(const std::string& pc,
                        nsCOMPtr<nsIEventTarget> main_thread,
                        nsCOMPtr<nsIEventTarget> sts_thread,
                        MediaStream *stream,
                        TrackID track_id,
                        int level,
                        RefPtr<MediaSessionConduit> conduit,
                        RefPtr<TransportFlow> rtp_transport,
-                       RefPtr<TransportFlow> rtcp_transport) :
+                       RefPtr<TransportFlow> rtcp_transport,
+                       RefPtr<TransportFlow> bundle_rtp_transport,
+                       RefPtr<TransportFlow> bundle_rtcp_transport,
+                       nsAutoPtr<MediaPipelineFilter> filter) :
       MediaPipeline(pc, RECEIVE, main_thread, sts_thread,
                     stream, track_id, level, conduit, rtp_transport,
                     rtcp_transport),
       segments_added_(0) {
+    filter_ = filter;
+    rtp_parser_ = webrtc::RtpHeaderParser::Create();
+    if (bundle_rtp_transport) {
+      if (bundle_rtcp_transport) {
+        MOZ_ASSERT(bundle_rtp_transport != bundle_rtcp_transport);
+        possible_bundle_rtp_ = new TransportInfo(bundle_rtp_transport, RTP);
+        possible_bundle_rtcp_ = new TransportInfo(bundle_rtcp_transport, RTCP);
+      } else {
+        possible_bundle_rtp_ = new TransportInfo(bundle_rtp_transport, MUX);
+        possible_bundle_rtcp_ = new TransportInfo(bundle_rtp_transport, MUX);
+      }
+    }
   }
 
   int segments_added() const { return segments_added_; }
 
  protected:
   int segments_added_;
 
  private:
@@ -468,20 +525,24 @@ class MediaPipelineReceiveAudio : public
   MediaPipelineReceiveAudio(const std::string& pc,
                             nsCOMPtr<nsIEventTarget> main_thread,
                             nsCOMPtr<nsIEventTarget> sts_thread,
                             MediaStream *stream,
                             TrackID track_id,
                             int level,
                             RefPtr<AudioSessionConduit> conduit,
                             RefPtr<TransportFlow> rtp_transport,
-                            RefPtr<TransportFlow> rtcp_transport) :
+                            RefPtr<TransportFlow> rtcp_transport,
+                            RefPtr<TransportFlow> bundle_rtp_transport,
+                            RefPtr<TransportFlow> bundle_rtcp_transport,
+                            nsAutoPtr<MediaPipelineFilter> filter) :
       MediaPipelineReceive(pc, main_thread, sts_thread,
                            stream, track_id, level, conduit, rtp_transport,
-                           rtcp_transport),
+                           rtcp_transport, bundle_rtp_transport,
+                           bundle_rtcp_transport, filter),
       listener_(new PipelineListener(stream->AsSourceStream(),
                                      track_id, conduit)) {
   }
 
   virtual void DetachMediaStream() {
     ASSERT_ON_THREAD(main_thread_);
     listener_->EndTrack();
     stream_->RemoveListener(listener_);
@@ -531,20 +592,24 @@ class MediaPipelineReceiveVideo : public
   MediaPipelineReceiveVideo(const std::string& pc,
                             nsCOMPtr<nsIEventTarget> main_thread,
                             nsCOMPtr<nsIEventTarget> sts_thread,
                             MediaStream *stream,
                             TrackID track_id,
                             int level,
                             RefPtr<VideoSessionConduit> conduit,
                             RefPtr<TransportFlow> rtp_transport,
-                            RefPtr<TransportFlow> rtcp_transport) :
+                            RefPtr<TransportFlow> rtcp_transport,
+                            RefPtr<TransportFlow> bundle_rtp_transport,
+                            RefPtr<TransportFlow> bundle_rtcp_transport,
+                            nsAutoPtr<MediaPipelineFilter> filter) :
       MediaPipelineReceive(pc, main_thread, sts_thread,
                            stream, track_id, level, conduit, rtp_transport,
-                           rtcp_transport),
+                           rtcp_transport, bundle_rtp_transport,
+                           bundle_rtcp_transport, filter),
       renderer_(new PipelineRenderer(MOZ_THIS_IN_INITIALIZER_LIST())),
       listener_(new PipelineListener(stream->AsSourceStream(), track_id)) {
   }
 
   // Called on the main thread.
   virtual void DetachMediaStream() {
     ASSERT_ON_THREAD(main_thread_);
 
--- a/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp
+++ b/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp
@@ -1322,19 +1322,21 @@ PeerConnectionImpl::GetStats(MediaStream
   for (auto p = pipelines.begin(); p != pipelines.end(); ++p) {
     size_t level = p->get()->level();
     // TODO(bcampen@mozilla.com): I may need to revisit this for bundle.
     // (Bug 786234)
     RefPtr<NrIceMediaStream> temp(mMedia->ice_media_stream(level-1));
     if (temp.get()) {
       streams.push_back(temp);
     } else {
-       CSFLogError(logTag, "Failed to get NrIceMediaStream for level %u "
+       CSFLogError(logTag, "Failed to get NrIceMediaStream for level %zu "
                            "in %s:  %s",
-                           uint32_t(level), __FUNCTION__, mHandle.c_str());
+                           static_cast<size_t>(level),
+                           __FUNCTION__,
+                           mHandle.c_str());
        MOZ_CRASH();
     }
   }
 
   DOMHighResTimeStamp now;
   nsresult rv = GetTimeSinceEpoch(&now);
   NS_ENSURE_SUCCESS(rv, rv);
 
--- a/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
+++ b/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.cpp
@@ -391,16 +391,50 @@ PeerConnectionMedia::GetRemoteStream(int
   if(aIndex < 0 || aIndex >= (int) mRemoteSourceStreams.Length()) {
     return nullptr;
   }
 
   MOZ_ASSERT(mRemoteSourceStreams[aIndex]);
   return mRemoteSourceStreams[aIndex];
 }
 
+bool
+PeerConnectionMedia::SetUsingBundle_m(int level, bool decision)
+{
+  ASSERT_ON_THREAD(mMainThread);
+  for (size_t i = 0; i < mRemoteSourceStreams.Length(); ++i) {
+    if (mRemoteSourceStreams[i]->SetUsingBundle_m(level, decision)) {
+      // Found the MediaPipeline for |level|
+      return true;
+    }
+  }
+  CSFLogWarn(logTag, "Could not locate level %d to set bundle flag to %s",
+                     static_cast<int>(level),
+                     decision ? "true" : "false");
+  return false;
+}
+
+bool
+PeerConnectionMedia::UpdateFilterFromRemoteDescription_m(
+    int level,
+    nsAutoPtr<mozilla::MediaPipelineFilter> filter)
+{
+  ASSERT_ON_THREAD(mMainThread);
+  for (size_t i = 0; i < mRemoteSourceStreams.Length(); ++i) {
+    if (mRemoteSourceStreams[i]->UpdateFilterFromRemoteDescription_m(level,
+                                                                     filter)) {
+      // Found the MediaPipeline for |level|
+      return true;
+    }
+  }
+  CSFLogWarn(logTag, "Could not locate level %d to update filter",
+                     static_cast<int>(level));
+  return false;
+}
+
 nsresult
 PeerConnectionMedia::AddRemoteStream(nsRefPtr<RemoteSourceStreamInfo> aInfo,
   int *aIndex)
 {
   MOZ_ASSERT(aIndex);
 
   *aIndex = mRemoteSourceStreams.Length();
 
@@ -498,10 +532,73 @@ RemoteSourceStreamInfo::StorePipeline(in
   }
   //TODO: Revisit once we start supporting multiple streams or multiple tracks
   // of same type
   mPipelines[aTrack] = aPipeline;
   //TODO: move to attribute on Pipeline
   mTypes[aTrack] = aIsVideo;
 }
 
+RefPtr<MediaPipeline> RemoteSourceStreamInfo::GetPipelineByLevel_m(int level) {
+  ASSERT_ON_THREAD(mParent->GetMainThread());
+  for (auto p = mPipelines.begin(); p != mPipelines.end(); ++p) {
+    if (p->second->level() == level) {
+      return p->second;
+    }
+  }
+  return nullptr;
+}
+
+bool RemoteSourceStreamInfo::UpdateFilterFromRemoteDescription_m(
+    int aLevel,
+    nsAutoPtr<mozilla::MediaPipelineFilter> aFilter) {
+  ASSERT_ON_THREAD(mParent->GetMainThread());
+
+  if (!mMediaStream) {
+    // Guard against dispatching once we've started teardown, since we don't
+    // want the RefPtr<MediaPipeline> being the last one standing on the call
+    // to MediaPipeline::UpdateFilterFromRemoteDescription_s; it is not safe
+    // to delete a MediaPipeline anywhere other than the main thread.
+    return false;
+  }
+
+  RefPtr<MediaPipeline> pipeline(GetPipelineByLevel_m(aLevel));
+
+  if (pipeline) {
+    RUN_ON_THREAD(mParent->GetSTSThread(),
+                  WrapRunnable(
+                      pipeline,
+                      &MediaPipeline::UpdateFilterFromRemoteDescription_s,
+                      aFilter
+                  ),
+                  NS_DISPATCH_NORMAL);
+    return true;
+  }
+  return false;
+}
+
+bool RemoteSourceStreamInfo::SetUsingBundle_m(int aLevel, bool decision) {
+  ASSERT_ON_THREAD(mParent->GetMainThread());
+
+  if (!mMediaStream) {
+    // Guard against dispatching once we've started teardown, since we don't
+    // want the RefPtr<MediaPipeline> being the last one standing on the call
+    // to MediaPipeline::SetUsingBundle_s; it is not safe
+    // to delete a MediaPipeline anywhere other than the main thread.
+    return false;
+  }
+
+  RefPtr<MediaPipeline> pipeline(GetPipelineByLevel_m(aLevel));
+
+  if (pipeline) {
+    RUN_ON_THREAD(mParent->GetSTSThread(),
+                  WrapRunnable(
+                      pipeline,
+                      &MediaPipeline::SetUsingBundle_s,
+                      decision
+                  ),
+                  NS_DISPATCH_NORMAL);
+    return true;
+  }
+  return false;
+}
 
 }  // namespace sipcc
--- a/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.h
+++ b/media/webrtc/signaling/src/peerconnection/PeerConnectionMedia.h
@@ -239,24 +239,30 @@ class RemoteSourceStreamInfo : public So
       mTrackTypeHints(0) {}
 
   DOMMediaStream* GetMediaStream() {
     return mMediaStream;
   }
   void StorePipeline(int aTrack, bool aIsVideo,
                      mozilla::RefPtr<mozilla::MediaPipeline> aPipeline);
 
+  bool UpdateFilterFromRemoteDescription_m(
+      int aLevel,
+      nsAutoPtr<mozilla::MediaPipelineFilter> aFilter);
+  bool SetUsingBundle_m(int aLevel, bool decision);
+
   void DetachTransport_s();
   void DetachMedia_m();
 
   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RemoteSourceStreamInfo)
 
 public:
   DOMMediaStream::TrackTypeHints mTrackTypeHints;
  private:
+  mozilla::RefPtr<mozilla::MediaPipeline> GetPipelineByLevel_m(int level);
   std::map<int, bool> mTypes;
 };
 
 class PeerConnectionMedia : public sigslot::has_slots<> {
  public:
   PeerConnectionMedia(PeerConnectionImpl *parent);
   ~PeerConnectionMedia() {}
 
@@ -291,16 +297,21 @@ class PeerConnectionMedia : public sigsl
 
   // Get a specific remote stream
   uint32_t RemoteStreamsLength()
   {
     return mRemoteSourceStreams.Length();
   }
   RemoteSourceStreamInfo* GetRemoteStream(int index);
 
+  bool SetUsingBundle_m(int level, bool decision);
+  bool UpdateFilterFromRemoteDescription_m(
+      int level,
+      nsAutoPtr<mozilla::MediaPipelineFilter> filter);
+
   // Add a remote stream. Returns the index in index
   nsresult AddRemoteStream(nsRefPtr<RemoteSourceStreamInfo> aInfo, int *aIndex);
   nsresult AddRemoteStreamHint(int aIndex, bool aIsVideo);
 
   const nsCOMPtr<nsIThread>& GetMainThread() const { return mMainThread; }
   const nsCOMPtr<nsIEventTarget>& GetSTSThread() const { return mSTSThread; }
 
   // Get a transport flow either RTP/RTCP for a particular stream
--- a/media/webrtc/signaling/test/mediapipeline_unittest.cpp
+++ b/media/webrtc/signaling/test/mediapipeline_unittest.cpp
@@ -109,20 +109,30 @@ class TestAgent {
  public:
   TestAgent() :
       audio_config_(109, "opus", 48000, 960, 2, 64000),
       audio_conduit_(mozilla::AudioSessionConduit::Create(nullptr)),
       audio_(),
       audio_pipeline_() {
   }
 
-  void ConnectSocket(PRFileDesc *fd, bool client, bool isRtcp) {
+  void ConnectRtpSocket(PRFileDesc *fd, bool client) {
+    ConnectSocket(&audio_rtp_transport_, fd, client);
+  }
+
+  void ConnectRtcpSocket(PRFileDesc *fd, bool client) {
+    ConnectSocket(&audio_rtcp_transport_, fd, client);
+  }
+
+  void ConnectBundleSocket(PRFileDesc *fd, bool client) {
+    ConnectSocket(&bundle_transport_, fd, client);
+  }
+
+  void ConnectSocket(TransportInfo *transport, PRFileDesc *fd, bool client) {
     nsresult res;
-    TransportInfo *transport = isRtcp ?
-      &audio_rtcp_transport_ : &audio_rtp_transport_;
 
     transport->Init(client);
 
     mozilla::SyncRunnable::DispatchToThread(
       test_utils->sts_target(),
       WrapRunnable(transport->prsock_, &TransportLayerPrsock::Import, fd, &res));
     if (!NS_SUCCEEDED(res)) {
       transport->FreeLayers();
@@ -145,16 +155,17 @@ class TestAgent {
 
     ASSERT_TRUE(NS_SUCCEEDED(ret));
   }
 
   void StopInt() {
     audio_->GetStream()->Stop();
     audio_rtp_transport_.Stop();
     audio_rtcp_transport_.Stop();
+    bundle_transport_.Stop();
     if (audio_pipeline_)
       audio_pipeline_->ShutdownTransport_s();
   }
 
   void Stop() {
     MOZ_MTLOG(ML_DEBUG, "Stopping");
 
     if (audio_pipeline_)
@@ -171,57 +182,73 @@ class TestAgent {
 
  protected:
   mozilla::AudioCodecConfig audio_config_;
   mozilla::RefPtr<mozilla::MediaSessionConduit> audio_conduit_;
   nsRefPtr<DOMMediaStream> audio_;
   mozilla::RefPtr<mozilla::MediaPipeline> audio_pipeline_;
   TransportInfo audio_rtp_transport_;
   TransportInfo audio_rtcp_transport_;
+  TransportInfo bundle_transport_;
 };
 
 class TestAgentSend : public TestAgent {
  public:
+  TestAgentSend() : use_bundle_(false) {}
+
   virtual void CreatePipelines_s(bool aIsRtcpMux) {
     audio_ = new Fake_DOMMediaStream(new Fake_AudioStreamSource());
 
     mozilla::MediaConduitErrorCode err =
         static_cast<mozilla::AudioSessionConduit *>(audio_conduit_.get())->
         ConfigureSendMediaCodec(&audio_config_);
     EXPECT_EQ(mozilla::kMediaConduitNoError, err);
 
     std::string test_pc("PC");
 
     if (aIsRtcpMux) {
       ASSERT_FALSE(audio_rtcp_transport_.flow_);
     }
 
+    RefPtr<TransportFlow> rtp(audio_rtp_transport_.flow_);
+    RefPtr<TransportFlow> rtcp(audio_rtcp_transport_.flow_);
+
+    if (use_bundle_) {
+      rtp = bundle_transport_.flow_;
+      rtcp = nullptr;
+    }
+
     audio_pipeline_ = new mozilla::MediaPipelineTransmit(
         test_pc,
         nullptr,
         test_utils->sts_target(),
         audio_,
         1,
         1,
         audio_conduit_,
-        audio_rtp_transport_.flow_,
-        audio_rtcp_transport_.flow_);
+        rtp,
+        rtcp);
 
     audio_pipeline_->Init();
   }
 
   int GetAudioRtpCount() {
     return audio_pipeline_->rtp_packets_sent();
   }
 
   int GetAudioRtcpCount() {
     return audio_pipeline_->rtcp_packets_received();
   }
 
+  void SetUsingBundle(bool use_bundle) {
+    use_bundle_ = use_bundle;
+  }
+
  private:
+  bool use_bundle_;
 };
 
 
 class TestAgentReceive : public TestAgent {
  public:
   virtual void CreatePipelines_s(bool aIsRtcpMux) {
     mozilla::SourceMediaStream *audio = new Fake_SourceMediaStream();
     audio->SetPullEnabled(true);
@@ -241,111 +268,196 @@ class TestAgentReceive : public TestAgen
     EXPECT_EQ(mozilla::kMediaConduitNoError, err);
 
     std::string test_pc("PC");
 
     if (aIsRtcpMux) {
       ASSERT_FALSE(audio_rtcp_transport_.flow_);
     }
 
+    // For now, assume bundle always uses rtcp mux
+    RefPtr<TransportFlow> dummy;
+    RefPtr<TransportFlow> bundle_transport;
+    if (bundle_filter_) {
+      bundle_transport = bundle_transport_.flow_;
+    }
+
     audio_pipeline_ = new mozilla::MediaPipelineReceiveAudio(
         test_pc,
         nullptr,
         test_utils->sts_target(),
         audio_->GetStream(), 1, 1,
         static_cast<mozilla::AudioSessionConduit *>(audio_conduit_.get()),
-        audio_rtp_transport_.flow_, audio_rtcp_transport_.flow_);
+        audio_rtp_transport_.flow_,
+        audio_rtcp_transport_.flow_,
+        bundle_transport,
+        dummy,
+        bundle_filter_);
 
     audio_pipeline_->Init();
   }
 
   int GetAudioRtpCount() {
     return audio_pipeline_->rtp_packets_received();
   }
 
   int GetAudioRtcpCount() {
     return audio_pipeline_->rtcp_packets_sent();
   }
 
+  void SetBundleFilter(nsAutoPtr<MediaPipelineFilter> filter) {
+    bundle_filter_ = filter;
+  }
+
+  void SetUsingBundle_s(bool decision) {
+    audio_pipeline_->SetUsingBundle_s(decision);
+  }
+
+  void UpdateFilterFromRemoteDescription_s(
+      nsAutoPtr<MediaPipelineFilter> filter) {
+    audio_pipeline_->UpdateFilterFromRemoteDescription_s(filter);
+  }
  private:
+  nsAutoPtr<MediaPipelineFilter> bundle_filter_;
 };
 
 
 class MediaPipelineTest : public ::testing::Test {
  public:
   MediaPipelineTest() : p1_() {
     rtp_fds_[0] = rtp_fds_[1] = nullptr;
     rtcp_fds_[0] = rtcp_fds_[1] = nullptr;
+    bundle_fds_[0] = bundle_fds_[1] = nullptr;
   }
 
   // Setup transport.
   void InitTransports(bool aIsRtcpMux) {
     // Create RTP related transport.
     PRStatus status =  PR_NewTCPSocketPair(rtp_fds_);
     ASSERT_EQ(status, PR_SUCCESS);
 
     // RTP, DTLS server
     mozilla::SyncRunnable::DispatchToThread(
       test_utils->sts_target(),
-      WrapRunnable(&p1_, &TestAgent::ConnectSocket, rtp_fds_[0], false, false));
+      WrapRunnable(&p1_, &TestAgent::ConnectRtpSocket, rtp_fds_[0], false));
 
     // RTP, DTLS client
     mozilla::SyncRunnable::DispatchToThread(
       test_utils->sts_target(),
-      WrapRunnable(&p2_, &TestAgent::ConnectSocket, rtp_fds_[1], true, false));
+      WrapRunnable(&p2_, &TestAgent::ConnectRtpSocket, rtp_fds_[1], true));
 
     // Create RTCP flows separately if we are not muxing them.
     if(!aIsRtcpMux) {
       status = PR_NewTCPSocketPair(rtcp_fds_);
       ASSERT_EQ(status, PR_SUCCESS);
 
       // RTCP, DTLS server
       mozilla::SyncRunnable::DispatchToThread(
         test_utils->sts_target(),
-        WrapRunnable(&p1_, &TestAgent::ConnectSocket, rtcp_fds_[0], false, true));
+        WrapRunnable(&p1_, &TestAgent::ConnectRtcpSocket, rtcp_fds_[0], false));
 
       // RTCP, DTLS client
       mozilla::SyncRunnable::DispatchToThread(
         test_utils->sts_target(),
-        WrapRunnable(&p2_, &TestAgent::ConnectSocket, rtcp_fds_[1], true, true));
+        WrapRunnable(&p2_, &TestAgent::ConnectRtcpSocket, rtcp_fds_[1], true));
     }
+
+    status = PR_NewTCPSocketPair(bundle_fds_);
+    // BUNDLE, DTLS server
+    mozilla::SyncRunnable::DispatchToThread(
+      test_utils->sts_target(),
+      WrapRunnable(&p1_,
+                   &TestAgent::ConnectBundleSocket,
+                   bundle_fds_[0],
+                   false));
+
+    // BUNDLE, DTLS client
+    mozilla::SyncRunnable::DispatchToThread(
+      test_utils->sts_target(),
+      WrapRunnable(&p2_,
+                   &TestAgent::ConnectBundleSocket,
+                   bundle_fds_[1],
+                   true));
+
   }
 
   // Verify RTP and RTCP
-  void TestAudioSend(bool aIsRtcpMux) {
+  void TestAudioSend(bool aIsRtcpMux,
+                     bool bundle = false,
+                     nsAutoPtr<MediaPipelineFilter> localFilter =
+                        nsAutoPtr<MediaPipelineFilter>(nullptr),
+                     nsAutoPtr<MediaPipelineFilter> remoteFilter =
+                        nsAutoPtr<MediaPipelineFilter>(nullptr)) {
+
+    // We do not support testing bundle without rtcp mux, since that doesn't
+    // make any sense.
+    ASSERT_FALSE(!aIsRtcpMux && bundle);
+
+    p1_.SetUsingBundle(bundle);
+    p2_.SetBundleFilter(localFilter);
+
     // Setup transport flows
     InitTransports(aIsRtcpMux);
 
     mozilla::SyncRunnable::DispatchToThread(
       test_utils->sts_target(),
       WrapRunnable(&p1_, &TestAgent::CreatePipelines_s, aIsRtcpMux));
 
     mozilla::SyncRunnable::DispatchToThread(
       test_utils->sts_target(),
       WrapRunnable(&p2_, &TestAgent::CreatePipelines_s, aIsRtcpMux));
 
     p2_.Start();
     p1_.Start();
 
+    // Simulate pre-answer traffic
+    PR_Sleep(500);
+
+    mozilla::SyncRunnable::DispatchToThread(
+      test_utils->sts_target(),
+      WrapRunnable(&p2_, &TestAgentReceive::SetUsingBundle_s, bundle));
+
+    if (bundle) {
+      if (!remoteFilter) {
+        remoteFilter = new MediaPipelineFilter;
+      }
+      mozilla::SyncRunnable::DispatchToThread(
+          test_utils->sts_target(),
+          WrapRunnable(&p2_,
+                       &TestAgentReceive::UpdateFilterFromRemoteDescription_s,
+                       remoteFilter));
+    }
+
+
     // wait for some RTP/RTCP tx and rx to happen
     PR_Sleep(10000);
 
-    ASSERT_GE(p1_.GetAudioRtpCount(), 40);
+    if (bundle) {
+      // Filter should have eaten everything, so no RTCP
+    } else {
+      ASSERT_GE(p1_.GetAudioRtpCount(), 40);
 // TODO: Fix to not fail or crash (Bug 947663)
 //    ASSERT_GE(p2_.GetAudioRtpCount(), 40);
-    ASSERT_GE(p1_.GetAudioRtcpCount(), 1);
-    ASSERT_GE(p2_.GetAudioRtcpCount(), 1);
+      ASSERT_GE(p2_.GetAudioRtcpCount(), 1);
+    }
 
     p1_.Stop();
     p2_.Stop();
   }
 
+  void TestAudioReceiverOffersBundle(bool bundle_accepted,
+      nsAutoPtr<MediaPipelineFilter> localFilter,
+      nsAutoPtr<MediaPipelineFilter> remoteFilter =
+          nsAutoPtr<MediaPipelineFilter>(nullptr)) {
+    TestAudioSend(true, bundle_accepted, localFilter, remoteFilter);
+  }
 protected:
   PRFileDesc *rtp_fds_[2];
   PRFileDesc *rtcp_fds_[2];
+  PRFileDesc *bundle_fds_[2];
   TestAgentSend p1_;
   TestAgentReceive p2_;
 };
 
 class MediaPipelineFilterTest : public ::testing::Test {
   public:
     bool Filter(MediaPipelineFilter& filter,
                 int32_t correlator,
@@ -708,17 +820,17 @@ TEST_F(MediaPipelineFilterTest, TestSSRC
   ASSERT_TRUE(Filter(filter, 7777, 555, 110));
   ASSERT_TRUE(Filter(filter, 0, 555, 110));
   ASSERT_FALSE(Filter(filter, 7778, 555, 110));
   ASSERT_FALSE(Filter(filter, 0, 555, 110));
 }
 
 TEST_F(MediaPipelineFilterTest, TestRemoteSDPNoSSRCs) {
   // If the remote SDP doesn't have SSRCs, right now this is a no-op and
-  // there is no point of even incorporating a filter, but we make the 
+  // there is no point of even incorporating a filter, but we make the
   // behavior consistent to avoid confusion.
   MediaPipelineFilter filter;
   filter.SetCorrelator(7777);
   filter.AddUniquePT(111);
   ASSERT_TRUE(Filter(filter, 7777, 555, 110));
 
   MediaPipelineFilter filter2;
 
@@ -731,16 +843,26 @@ TEST_F(MediaPipelineFilterTest, TestRemo
 TEST_F(MediaPipelineTest, TestAudioSendNoMux) {
   TestAudioSend(false);
 }
 
 TEST_F(MediaPipelineTest, TestAudioSendMux) {
   TestAudioSend(true);
 }
 
+TEST_F(MediaPipelineTest, TestAudioSendBundleOfferedAndDeclined) {
+  nsAutoPtr<MediaPipelineFilter> filter(new MediaPipelineFilter);
+  TestAudioReceiverOffersBundle(false, filter);
+}
+
+TEST_F(MediaPipelineTest, TestAudioSendBundleOfferedAndAccepted) {
+  nsAutoPtr<MediaPipelineFilter> filter(new MediaPipelineFilter);
+  TestAudioReceiverOffersBundle(true, filter);
+}
+
 }  // end namespace
 
 
 int main(int argc, char **argv) {
   test_utils = new MtransportTestUtils();
   // Start the tests
   NSS_NoDB_Init(nullptr);
   NSS_SetDomesticPolicy();