//! End-to-end MCU server-side mix workload, on real speech audio. //! //! An **MCU** (Multipoint Control Unit) is a conferencing server that //! decodes every participant's incoming stream, mixes them in PCM, and //! re-encodes a per-receiver output. Contrast with an **SFU** (Selective //! Forwarding Unit) which forwards encoded streams byte-for-byte with no //! decode — lower CPU, higher bandwidth, no mix. //! //! This test simulates the MCU hot loop (decode → mix → encode) and lets //! us answer three concrete questions with real numbers: //! //! 1. **How many concurrent meetings can one CPU core handle?** Reported //! as the realtime ratio — "1000 ms of audio processed in 20 ms wall //! clock" means one core can handle 50 concurrent meetings at that //! configuration. //! 2. **What fraction of MCU time is actually encode?** Breaks the loop //! into decode, mix, and encode phases so a later optimisation can //! target the real bottleneck rather than the suspected one. //! 3. **How much bandwidth does server-side mix save over pure SFU //! fanout?** Compares the MCU's outgoing byte total (one stream per //! receiver, each a leave-one-out mix) against what an SFU would send //! (`P × (P − 1)` stream copies across the meeting). //! //! The same test is the natural baseline for future codec work: //! //! - **Q15 coefficient-shift fix**: should reduce encoded byte totals on //! bass-heavy content (music tests). Compare `bytes_out_mix` before and //! after. //! - **Encoder search optimisation**: should reduce `encode_ns` without //! hurting `bytes_out_mix`. Compare the phase breakdown before and //! after. //! //! Run with `cargo test --test mcu_mix --release -- --nocapture`. use std::path::{Path, PathBuf}; use std::time::Instant; use hound::WavReader; use lac::{decode_frame, encode_frame}; const CORPUS_DIR: &str = "corpus"; /// 20 ms frame at 16 kHz — standard voice-chat frame length. Divides /// cleanly by every partition order so the encoder search stays on the /// dense path. const FRAME_SIZE: usize = 320; /// Cap wall-clock runtime by limiting how many frames we actually /// process. At 20 ms/frame, 1500 frames is 30 s of audio per stream — /// enough to average out transient behaviour without making the test take /// minutes in CI. const MAX_FRAMES: usize = 1500; // ── WAV loading ───────────────────────────────────────────────────────────── fn load_mono(path: &Path) -> Option> { let mut reader = WavReader::open(path).ok()?; let spec = reader.spec(); if spec.sample_format != hound::SampleFormat::Int || spec.channels != 1 || spec.bits_per_sample > 24 { return None; } let samples: Result, _> = reader.samples::().collect(); samples.ok() } fn corpus(name: &str) -> PathBuf { Path::new(CORPUS_DIR).join(name) } macro_rules! require { ($path:expr) => { if !$path.exists() { eprintln!("skipping: corpus file not found: {}", $path.display()); return; } }; } // ── MCU mix pipeline ──────────────────────────────────────────────────────── /// Result of one MCU simulation run. #[derive(Default)] struct McuStats { /// Number of participants in the simulated meeting. participants: usize, /// Number of frames processed per participant (each frame is `FRAME_SIZE` samples). frames: usize, /// Total nanoseconds spent in `decode_frame` across all decodes. decode_ns: u128, /// Total nanoseconds spent doing the leave-one-out mix additions. mix_ns: u128, /// Total nanoseconds spent in `encode_frame` across all output encodes. encode_ns: u128, /// Bytes sent *into* the MCU by participants (pre-encoded frames). bytes_in: usize, /// Bytes sent *out* of the MCU (one stream per receiver, each a /// leave-one-out mix). bytes_out_mix: usize, /// Bytes an SFU would send out for the same meeting: `P × (P − 1)` /// stream copies, since every participant receives every other /// participant's stream verbatim with no mix. bytes_out_fanout: usize, } impl McuStats { fn total_ns(&self) -> u128 { self.decode_ns + self.mix_ns + self.encode_ns } /// Audio duration processed, in milliseconds. fn audio_ms(&self) -> f64 { // FRAME_SIZE samples at 16 kHz = (FRAME_SIZE / 16000) seconds per frame. (self.frames as f64) * (FRAME_SIZE as f64) / 16_000.0 * 1000.0 } /// Realtime multiplier — audio ms per wall-clock ms. A value of 50 /// means this workload runs 50× faster than realtime, i.e., one core /// handles 50 concurrent meetings of this configuration. fn realtime_ratio(&self) -> f64 { let wall_ms = (self.total_ns() as f64) / 1_000_000.0; self.audio_ms() / wall_ms } } /// Run the decode → mix → encode loop for a simulated MCU meeting with /// the given pre-encoded participant streams. Returns timing and byte /// accounting for a single run. fn simulate_meeting(encoded_streams: &[Vec>]) -> McuStats { let p = encoded_streams.len(); assert!(p >= 2, "need at least 2 participants for a meeting"); let n_frames = encoded_streams.iter().map(|s| s.len()).min().unwrap(); let n_frames = n_frames.min(MAX_FRAMES); let mut stats = McuStats { participants: p, frames: n_frames, ..Default::default() }; // Ingress bytes: each participant sends all its encoded frames to the MCU. for s in encoded_streams { stats.bytes_in += s[..n_frames].iter().map(|f| f.len()).sum::(); } // Reusable scratch buffer for leave-one-out mixes. The mix for // receiver `r` excludes participant `r`'s own voice so they don't // hear themselves delayed. let mut mix = vec![0i32; FRAME_SIZE]; // `frame_idx` is a tick index that selects the same position across // every participant's encoded-frame vector. Converting this to // `iter().zip(...)` across variable-arity participants complicates // the loop body for no runtime benefit, so the integer-index form // stays; silence the `needless_range_loop` lint explicitly. #[allow(clippy::needless_range_loop)] for frame_idx in 0..n_frames { // ── Phase 1: decode every incoming stream for this frame tick ── let t0 = Instant::now(); let decoded: Vec> = (0..p) .map(|i| decode_frame(&encoded_streams[i][frame_idx]).expect("decode")) .collect(); stats.decode_ns += t0.elapsed().as_nanos(); // ── Phase 2 + 3: for each receiver, build its leave-one-out mix and encode ── for receiver in 0..p { // Mix (i32 additions). Wrapping add is appropriate — i32 // sample range is ±2²³-1 and summing up to P-1 ≤ 31 streams // keeps the running total within i32 (max 2^31-1 / 2^23 ≈ 256 // summands before saturation). let t_mix = Instant::now(); mix.fill(0); for (i, stream) in decoded.iter().enumerate() { if i == receiver { continue; } for (m, &s) in mix.iter_mut().zip(stream.iter()) { *m = m.wrapping_add(s); } } stats.mix_ns += t_mix.elapsed().as_nanos(); // Clamp the mix to the 24-bit input range LAC guarantees. In // practice, summing 4 typical speech streams almost never // exceeds the range (human voice peaks well under full-scale, // and constructive superposition of uncorrelated speakers is // rare), but clamp anyway to stay inside the codec contract. for m in mix.iter_mut() { *m = (*m).clamp(-(1 << 23) + 1, (1 << 23) - 1); } let t_enc = Instant::now(); let encoded = encode_frame(&mix); stats.encode_ns += t_enc.elapsed().as_nanos(); stats.bytes_out_mix += encoded.len(); } // Fanout byte accounting: for each frame, each participant's // stream is copied to the P-1 other receivers, with no re-encode. // That's P × (P-1) copies, each the original encoded frame size. for stream in encoded_streams.iter() { stats.bytes_out_fanout += stream[frame_idx].len() * (p - 1); } } stats } // ── Reporting ─────────────────────────────────────────────────────────────── fn report(name: &str, s: &McuStats) { let total_us = (s.total_ns() as f64) / 1000.0; let decode_pct = 100.0 * (s.decode_ns as f64) / (s.total_ns() as f64); let mix_pct = 100.0 * (s.mix_ns as f64) / (s.total_ns() as f64); let encode_pct = 100.0 * (s.encode_ns as f64) / (s.total_ns() as f64); let bandwidth_ratio = s.bytes_out_mix as f64 / s.bytes_out_fanout as f64; eprintln!(); eprintln!("== {name} =="); eprintln!( " {} participants × {} frames = {:.1} s of audio per stream", s.participants, s.frames, s.audio_ms() / 1000.0 ); eprintln!( " wall: {:.1} ms ({:.1}× realtime → {:.0} concurrent meetings/core)", total_us / 1000.0, s.realtime_ratio(), s.realtime_ratio(), ); eprintln!( " phase: decode {:>5.1}% mix {:>5.1}% encode {:>5.1}%", decode_pct, mix_pct, encode_pct ); eprintln!( " bytes: in {} / out_mix {} / out_fanout {} / mix_vs_fanout {:.2}", s.bytes_in, s.bytes_out_mix, s.bytes_out_fanout, bandwidth_ratio ); } // ── Tests ─────────────────────────────────────────────────────────────────── /// Pre-encode all participant streams into `FRAME_SIZE`-sample frames and /// hand the result to `simulate_meeting`. Splitting the setup out of the /// simulation keeps the reported `encode_ns` to the *server-side* encode /// work only — participant-side encoding is a separate machine and /// shouldn't contaminate the MCU measurement. fn run(name: &str, stream_names: &[&str]) { run_inner(name, stream_names, Activity::Continuous); } /// Turn-taking activity pattern. `Continuous` is every participant /// producing audio every frame (the pessimistic load-bearing case for /// MCU mix/encode cost). `DominantSpeaker { window_frames }` zeroes /// every stream except a single rotating speaker for each /// `window_frames`-long block. This is the realistic meeting behaviour /// — one person talks at a time — under which the MCU compute cost /// drops substantially (most mixes collapse to one voice + silence) but /// the relative byte savings vs SFU shrink (silence forwards for /// essentially free on a pure SFU, so the SFU egress number also falls). enum Activity { Continuous, DominantSpeaker { window_frames: usize }, } fn run_inner(name: &str, stream_names: &[&str], activity: Activity) { let paths: Vec = stream_names.iter().map(|n| corpus(n)).collect(); for p in &paths { if !p.exists() { eprintln!("skipping {name}: missing {}", p.display()); return; } } // Cap each stream to the number of samples we'll actually simulate, // so we don't spend time pre-encoding the remaining 30+ minutes of // audio that `simulate_meeting` will never touch. The AMI files are // multi-minute recordings; MAX_FRAMES = 1500 @ 20 ms/frame = 30 s. let max_samples = MAX_FRAMES * FRAME_SIZE; let mut streams: Vec> = paths .iter() .map(|p| { let mut s = load_mono(p).expect("load_mono"); s.truncate(max_samples); s }) .collect(); if let Activity::DominantSpeaker { window_frames } = activity { apply_dominant_speaker(&mut streams, window_frames); } let encoded_streams: Vec>> = streams .iter() .map(|s| s.chunks(FRAME_SIZE).map(encode_frame).collect()) .collect(); let stats = simulate_meeting(&encoded_streams); report(name, &stats); // Sanity assertion: MCU egress must not exceed SFU egress. Equality // holds at P=2 (the leave-one-out mix is just the other participant's // voice, which encodes to the same size as forwarding their original // stream); strict inequality holds for P ≥ 3 because fanout's byte // count grows as `P × (P − 1)` while mix grows as `P`. // // For dominant-speaker the invariant still holds on the egress side // (MCU produces one stream per receiver; SFU forwards P-1 streams // per receiver) but the absolute margin narrows because silence // compresses to ~1 bit per sample and SFU's egress of N-1 silent // streams is essentially free. assert!( stats.bytes_out_mix <= stats.bytes_out_fanout, "MCU mix produced more bytes than SFU fanout: mix={} fanout={}", stats.bytes_out_mix, stats.bytes_out_fanout ); } /// Zero every stream except a single rotating speaker per /// `window_frames`-long block. Assumes every stream in `streams` has the /// same length and `FRAME_SIZE` divides it; the runtime calls this only /// with post-truncate stream slices where both hold. /// /// The rotation uses the block index modulo the participant count, so /// each participant gets roughly `total_blocks / participants` turns /// spread across the measurement window. At the default /// `window_frames = 100` (2 s per turn at 20 ms frames) and /// `MAX_FRAMES = 1500`, P=8 gives each participant ~1.9 turns — enough /// for the phase breakdown to average over speaker transitions. fn apply_dominant_speaker(streams: &mut [Vec], window_frames: usize) { let n = streams.len(); if n == 0 { return; } // AMI files aren't guaranteed to be identical length even after the // common `truncate(max_samples)` step — a short recording stays // short. Use the shortest stream as the rotation horizon so the // per-stream slice below never runs past a stream's own end. let common_len = streams.iter().map(|s| s.len()).min().unwrap_or(0); let total_frames = common_len / FRAME_SIZE; let window_samples = window_frames * FRAME_SIZE; for block in 0..total_frames.div_ceil(window_frames) { let active = block % n; let start = block * window_samples; let end = (start + window_samples).min(common_len); for (i, s) in streams.iter_mut().enumerate() { if i == active { continue; } for sample in &mut s[start..end] { *sample = 0; } } } } #[test] fn mcu_mix_1on1_voice() { require!(corpus("ES2002a.Headset-0.wav")); run( "mcu_mix_1on1_voice", &["ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav"], ); } #[test] fn mcu_mix_3people_voice() { require!(corpus("ES2002a.Headset-0.wav")); run( "mcu_mix_3people_voice", &[ "ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav", "ES2002a.Headset-2.wav", ], ); } #[test] fn mcu_mix_5people_voice() { require!(corpus("ES2002a.Headset-0.wav")); run( "mcu_mix_5people_voice", &[ "ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav", "ES2002a.Headset-2.wav", "ES2002a.Headset-3.wav", "ES2002a.Lapel-0.wav", ], ); } #[test] fn mcu_mix_8people_voice() { // Covers the cross-over where mix bandwidth savings become dramatic — // fanout grows quadratically, server-mix stays linear. require!(corpus("ES2002a.Headset-0.wav")); run( "mcu_mix_8people_voice", &[ "ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav", "ES2002a.Headset-2.wav", "ES2002a.Headset-3.wav", "ES2002a.Lapel-0.wav", "ES2002a.Lapel-1.wav", "ES2002a.Lapel-2.wav", "ES2002a.Lapel-3.wav", ], ); } #[test] fn mcu_mix_8people_dominant_speaker() { // Same participant set as `mcu_mix_8people_voice`, but only one // participant has audio at any given moment — rotating every // 2 seconds (100 frames @ 20 ms each). This is the realistic // meeting behaviour: one person talks while N-1 listen. The // continuous-speech test above is the pessimistic workload // (everyone talks simultaneously, which inflates MCU mix cost and // SFU fanout byte count); this test shows what the server actually // experiences in production traffic. // // Expected differences vs the continuous variant: // - Decode phase: unchanged in wall-clock (silent frames decode // about as cheap as voice frames — the codec still walks every // residual). // - Mix phase: still O(P²) additions per frame, unchanged. // - Encode phase: drops sharply. Most mixes collapse to "one // voice + silence + silence + …", which encodes roughly like // a single voice stream instead of a P-1-way sum. // - Egress ratio (MCU vs SFU): narrows. SFU forwards (P-1) silent // streams per receiver nearly for free, so the SFU egress // baseline drops faster than MCU egress does. require!(corpus("ES2002a.Headset-0.wav")); run_inner( "mcu_mix_8people_dominant_speaker", &[ "ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav", "ES2002a.Headset-2.wav", "ES2002a.Headset-3.wav", "ES2002a.Lapel-0.wav", "ES2002a.Lapel-1.wav", "ES2002a.Lapel-2.wav", "ES2002a.Lapel-3.wav", ], Activity::DominantSpeaker { window_frames: 100 }, ); } #[test] fn mcu_mix_16people_voice() { // Past the typical full-mesh MCU sweet spot but still plausible for // a mid-size meeting before hierarchical routing kicks in. The mix // phase does 16 × 15 = 240 sample-wise additions per frame vs // 8 × 7 = 56 at P=8, so the quadratic mix term starts showing up in // the phase breakdown. Larger real meetings (P > 20) are typically // routed via dominant-speaker selection rather than full-mesh mix, // so this is a reasonable ceiling for the full-mesh data point. require!(corpus("ES2002a.Headset-0.wav")); run( "mcu_mix_16people_voice", &[ "ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav", "ES2002a.Headset-2.wav", "ES2002a.Headset-3.wav", "ES2002a.Lapel-0.wav", "ES2002a.Lapel-1.wav", "ES2002a.Lapel-2.wav", "ES2002a.Lapel-3.wav", "ES2002a.Array1-01.wav", "ES2002a.Array1-02.wav", "ES2002a.Array1-03.wav", "ES2002a.Array1-04.wav", "ES2002a.Array1-05.wav", "ES2002a.Array1-06.wav", "ES2002a.Array1-07.wav", "ES2002a.Array1-08.wav", ], ); }