488 lines
19 KiB
Rust
488 lines
19 KiB
Rust
//! End-to-end MCU server-side mix workload, on real speech audio.
|
||
//!
|
||
//! An **MCU** (Multipoint Control Unit) is a conferencing server that
|
||
//! decodes every participant's incoming stream, mixes them in PCM, and
|
||
//! re-encodes a per-receiver output. Contrast with an **SFU** (Selective
|
||
//! Forwarding Unit) which forwards encoded streams byte-for-byte with no
|
||
//! decode — lower CPU, higher bandwidth, no mix.
|
||
//!
|
||
//! This test simulates the MCU hot loop (decode → mix → encode) and lets
|
||
//! us answer three concrete questions with real numbers:
|
||
//!
|
||
//! 1. **How many concurrent meetings can one CPU core handle?** Reported
|
||
//! as the realtime ratio — "1000 ms of audio processed in 20 ms wall
|
||
//! clock" means one core can handle 50 concurrent meetings at that
|
||
//! configuration.
|
||
//! 2. **What fraction of MCU time is actually encode?** Breaks the loop
|
||
//! into decode, mix, and encode phases so a later optimisation can
|
||
//! target the real bottleneck rather than the suspected one.
|
||
//! 3. **How much bandwidth does server-side mix save over pure SFU
|
||
//! fanout?** Compares the MCU's outgoing byte total (one stream per
|
||
//! receiver, each a leave-one-out mix) against what an SFU would send
|
||
//! (`P × (P − 1)` stream copies across the meeting).
|
||
//!
|
||
//! The same test is the natural baseline for future codec work:
|
||
//!
|
||
//! - **Q15 coefficient-shift fix**: should reduce encoded byte totals on
|
||
//! bass-heavy content (music tests). Compare `bytes_out_mix` before and
|
||
//! after.
|
||
//! - **Encoder search optimisation**: should reduce `encode_ns` without
|
||
//! hurting `bytes_out_mix`. Compare the phase breakdown before and
|
||
//! after.
|
||
//!
|
||
//! Run with `cargo test --test mcu_mix --release -- --nocapture`.
|
||
|
||
use std::path::{Path, PathBuf};
|
||
use std::time::Instant;
|
||
|
||
use hound::WavReader;
|
||
use lac::{decode_frame, encode_frame};
|
||
|
||
const CORPUS_DIR: &str = "corpus";
|
||
|
||
/// 20 ms frame at 16 kHz — standard voice-chat frame length. Divides
|
||
/// cleanly by every partition order so the encoder search stays on the
|
||
/// dense path.
|
||
const FRAME_SIZE: usize = 320;
|
||
|
||
/// Cap wall-clock runtime by limiting how many frames we actually
|
||
/// process. At 20 ms/frame, 1500 frames is 30 s of audio per stream —
|
||
/// enough to average out transient behaviour without making the test take
|
||
/// minutes in CI.
|
||
const MAX_FRAMES: usize = 1500;
|
||
|
||
// ── WAV loading ─────────────────────────────────────────────────────────────
|
||
|
||
fn load_mono(path: &Path) -> Option<Vec<i32>> {
|
||
let mut reader = WavReader::open(path).ok()?;
|
||
let spec = reader.spec();
|
||
if spec.sample_format != hound::SampleFormat::Int
|
||
|| spec.channels != 1
|
||
|| spec.bits_per_sample > 24
|
||
{
|
||
return None;
|
||
}
|
||
let samples: Result<Vec<i32>, _> = reader.samples::<i32>().collect();
|
||
samples.ok()
|
||
}
|
||
|
||
fn corpus(name: &str) -> PathBuf {
|
||
Path::new(CORPUS_DIR).join(name)
|
||
}
|
||
|
||
macro_rules! require {
|
||
($path:expr) => {
|
||
if !$path.exists() {
|
||
eprintln!("skipping: corpus file not found: {}", $path.display());
|
||
return;
|
||
}
|
||
};
|
||
}
|
||
|
||
// ── MCU mix pipeline ────────────────────────────────────────────────────────
|
||
|
||
/// Result of one MCU simulation run.
|
||
#[derive(Default)]
|
||
struct McuStats {
|
||
/// Number of participants in the simulated meeting.
|
||
participants: usize,
|
||
/// Number of frames processed per participant (each frame is `FRAME_SIZE` samples).
|
||
frames: usize,
|
||
/// Total nanoseconds spent in `decode_frame` across all decodes.
|
||
decode_ns: u128,
|
||
/// Total nanoseconds spent doing the leave-one-out mix additions.
|
||
mix_ns: u128,
|
||
/// Total nanoseconds spent in `encode_frame` across all output encodes.
|
||
encode_ns: u128,
|
||
/// Bytes sent *into* the MCU by participants (pre-encoded frames).
|
||
bytes_in: usize,
|
||
/// Bytes sent *out* of the MCU (one stream per receiver, each a
|
||
/// leave-one-out mix).
|
||
bytes_out_mix: usize,
|
||
/// Bytes an SFU would send out for the same meeting: `P × (P − 1)`
|
||
/// stream copies, since every participant receives every other
|
||
/// participant's stream verbatim with no mix.
|
||
bytes_out_fanout: usize,
|
||
}
|
||
|
||
impl McuStats {
|
||
fn total_ns(&self) -> u128 {
|
||
self.decode_ns + self.mix_ns + self.encode_ns
|
||
}
|
||
|
||
/// Audio duration processed, in milliseconds.
|
||
fn audio_ms(&self) -> f64 {
|
||
// FRAME_SIZE samples at 16 kHz = (FRAME_SIZE / 16000) seconds per frame.
|
||
(self.frames as f64) * (FRAME_SIZE as f64) / 16_000.0 * 1000.0
|
||
}
|
||
|
||
/// Realtime multiplier — audio ms per wall-clock ms. A value of 50
|
||
/// means this workload runs 50× faster than realtime, i.e., one core
|
||
/// handles 50 concurrent meetings of this configuration.
|
||
fn realtime_ratio(&self) -> f64 {
|
||
let wall_ms = (self.total_ns() as f64) / 1_000_000.0;
|
||
self.audio_ms() / wall_ms
|
||
}
|
||
}
|
||
|
||
/// Run the decode → mix → encode loop for a simulated MCU meeting with
|
||
/// the given pre-encoded participant streams. Returns timing and byte
|
||
/// accounting for a single run.
|
||
fn simulate_meeting(encoded_streams: &[Vec<Vec<u8>>]) -> McuStats {
|
||
let p = encoded_streams.len();
|
||
assert!(p >= 2, "need at least 2 participants for a meeting");
|
||
let n_frames = encoded_streams.iter().map(|s| s.len()).min().unwrap();
|
||
let n_frames = n_frames.min(MAX_FRAMES);
|
||
|
||
let mut stats = McuStats {
|
||
participants: p,
|
||
frames: n_frames,
|
||
..Default::default()
|
||
};
|
||
|
||
// Ingress bytes: each participant sends all its encoded frames to the MCU.
|
||
for s in encoded_streams {
|
||
stats.bytes_in += s[..n_frames].iter().map(|f| f.len()).sum::<usize>();
|
||
}
|
||
|
||
// Reusable scratch buffer for leave-one-out mixes. The mix for
|
||
// receiver `r` excludes participant `r`'s own voice so they don't
|
||
// hear themselves delayed.
|
||
let mut mix = vec![0i32; FRAME_SIZE];
|
||
|
||
// `frame_idx` is a tick index that selects the same position across
|
||
// every participant's encoded-frame vector. Converting this to
|
||
// `iter().zip(...)` across variable-arity participants complicates
|
||
// the loop body for no runtime benefit, so the integer-index form
|
||
// stays; silence the `needless_range_loop` lint explicitly.
|
||
#[allow(clippy::needless_range_loop)]
|
||
for frame_idx in 0..n_frames {
|
||
// ── Phase 1: decode every incoming stream for this frame tick ──
|
||
let t0 = Instant::now();
|
||
let decoded: Vec<Vec<i32>> = (0..p)
|
||
.map(|i| decode_frame(&encoded_streams[i][frame_idx]).expect("decode"))
|
||
.collect();
|
||
stats.decode_ns += t0.elapsed().as_nanos();
|
||
|
||
// ── Phase 2 + 3: for each receiver, build its leave-one-out mix and encode ──
|
||
for receiver in 0..p {
|
||
// Mix (i32 additions). Wrapping add is appropriate — i32
|
||
// sample range is ±2²³-1 and summing up to P-1 ≤ 31 streams
|
||
// keeps the running total within i32 (max 2^31-1 / 2^23 ≈ 256
|
||
// summands before saturation).
|
||
let t_mix = Instant::now();
|
||
mix.fill(0);
|
||
for (i, stream) in decoded.iter().enumerate() {
|
||
if i == receiver {
|
||
continue;
|
||
}
|
||
for (m, &s) in mix.iter_mut().zip(stream.iter()) {
|
||
*m = m.wrapping_add(s);
|
||
}
|
||
}
|
||
stats.mix_ns += t_mix.elapsed().as_nanos();
|
||
|
||
// Clamp the mix to the 24-bit input range LAC guarantees. In
|
||
// practice, summing 4 typical speech streams almost never
|
||
// exceeds the range (human voice peaks well under full-scale,
|
||
// and constructive superposition of uncorrelated speakers is
|
||
// rare), but clamp anyway to stay inside the codec contract.
|
||
for m in mix.iter_mut() {
|
||
*m = (*m).clamp(-(1 << 23) + 1, (1 << 23) - 1);
|
||
}
|
||
|
||
let t_enc = Instant::now();
|
||
let encoded = encode_frame(&mix);
|
||
stats.encode_ns += t_enc.elapsed().as_nanos();
|
||
stats.bytes_out_mix += encoded.len();
|
||
}
|
||
|
||
// Fanout byte accounting: for each frame, each participant's
|
||
// stream is copied to the P-1 other receivers, with no re-encode.
|
||
// That's P × (P-1) copies, each the original encoded frame size.
|
||
for stream in encoded_streams.iter() {
|
||
stats.bytes_out_fanout += stream[frame_idx].len() * (p - 1);
|
||
}
|
||
}
|
||
|
||
stats
|
||
}
|
||
|
||
// ── Reporting ───────────────────────────────────────────────────────────────
|
||
|
||
fn report(name: &str, s: &McuStats) {
|
||
let total_us = (s.total_ns() as f64) / 1000.0;
|
||
let decode_pct = 100.0 * (s.decode_ns as f64) / (s.total_ns() as f64);
|
||
let mix_pct = 100.0 * (s.mix_ns as f64) / (s.total_ns() as f64);
|
||
let encode_pct = 100.0 * (s.encode_ns as f64) / (s.total_ns() as f64);
|
||
let bandwidth_ratio = s.bytes_out_mix as f64 / s.bytes_out_fanout as f64;
|
||
|
||
eprintln!();
|
||
eprintln!("== {name} ==");
|
||
eprintln!(
|
||
" {} participants × {} frames = {:.1} s of audio per stream",
|
||
s.participants,
|
||
s.frames,
|
||
s.audio_ms() / 1000.0
|
||
);
|
||
eprintln!(
|
||
" wall: {:.1} ms ({:.1}× realtime → {:.0} concurrent meetings/core)",
|
||
total_us / 1000.0,
|
||
s.realtime_ratio(),
|
||
s.realtime_ratio(),
|
||
);
|
||
eprintln!(
|
||
" phase: decode {:>5.1}% mix {:>5.1}% encode {:>5.1}%",
|
||
decode_pct, mix_pct, encode_pct
|
||
);
|
||
eprintln!(
|
||
" bytes: in {} / out_mix {} / out_fanout {} / mix_vs_fanout {:.2}",
|
||
s.bytes_in, s.bytes_out_mix, s.bytes_out_fanout, bandwidth_ratio
|
||
);
|
||
}
|
||
|
||
// ── Tests ───────────────────────────────────────────────────────────────────
|
||
|
||
/// Pre-encode all participant streams into `FRAME_SIZE`-sample frames and
|
||
/// hand the result to `simulate_meeting`. Splitting the setup out of the
|
||
/// simulation keeps the reported `encode_ns` to the *server-side* encode
|
||
/// work only — participant-side encoding is a separate machine and
|
||
/// shouldn't contaminate the MCU measurement.
|
||
fn run(name: &str, stream_names: &[&str]) {
|
||
run_inner(name, stream_names, Activity::Continuous);
|
||
}
|
||
|
||
/// Turn-taking activity pattern. `Continuous` is every participant
|
||
/// producing audio every frame (the pessimistic load-bearing case for
|
||
/// MCU mix/encode cost). `DominantSpeaker { window_frames }` zeroes
|
||
/// every stream except a single rotating speaker for each
|
||
/// `window_frames`-long block. This is the realistic meeting behaviour
|
||
/// — one person talks at a time — under which the MCU compute cost
|
||
/// drops substantially (most mixes collapse to one voice + silence) but
|
||
/// the relative byte savings vs SFU shrink (silence forwards for
|
||
/// essentially free on a pure SFU, so the SFU egress number also falls).
|
||
enum Activity {
|
||
Continuous,
|
||
DominantSpeaker { window_frames: usize },
|
||
}
|
||
|
||
fn run_inner(name: &str, stream_names: &[&str], activity: Activity) {
|
||
let paths: Vec<PathBuf> = stream_names.iter().map(|n| corpus(n)).collect();
|
||
for p in &paths {
|
||
if !p.exists() {
|
||
eprintln!("skipping {name}: missing {}", p.display());
|
||
return;
|
||
}
|
||
}
|
||
|
||
// Cap each stream to the number of samples we'll actually simulate,
|
||
// so we don't spend time pre-encoding the remaining 30+ minutes of
|
||
// audio that `simulate_meeting` will never touch. The AMI files are
|
||
// multi-minute recordings; MAX_FRAMES = 1500 @ 20 ms/frame = 30 s.
|
||
let max_samples = MAX_FRAMES * FRAME_SIZE;
|
||
let mut streams: Vec<Vec<i32>> = paths
|
||
.iter()
|
||
.map(|p| {
|
||
let mut s = load_mono(p).expect("load_mono");
|
||
s.truncate(max_samples);
|
||
s
|
||
})
|
||
.collect();
|
||
|
||
if let Activity::DominantSpeaker { window_frames } = activity {
|
||
apply_dominant_speaker(&mut streams, window_frames);
|
||
}
|
||
|
||
let encoded_streams: Vec<Vec<Vec<u8>>> = streams
|
||
.iter()
|
||
.map(|s| s.chunks(FRAME_SIZE).map(encode_frame).collect())
|
||
.collect();
|
||
|
||
let stats = simulate_meeting(&encoded_streams);
|
||
report(name, &stats);
|
||
|
||
// Sanity assertion: MCU egress must not exceed SFU egress. Equality
|
||
// holds at P=2 (the leave-one-out mix is just the other participant's
|
||
// voice, which encodes to the same size as forwarding their original
|
||
// stream); strict inequality holds for P ≥ 3 because fanout's byte
|
||
// count grows as `P × (P − 1)` while mix grows as `P`.
|
||
//
|
||
// For dominant-speaker the invariant still holds on the egress side
|
||
// (MCU produces one stream per receiver; SFU forwards P-1 streams
|
||
// per receiver) but the absolute margin narrows because silence
|
||
// compresses to ~1 bit per sample and SFU's egress of N-1 silent
|
||
// streams is essentially free.
|
||
assert!(
|
||
stats.bytes_out_mix <= stats.bytes_out_fanout,
|
||
"MCU mix produced more bytes than SFU fanout: mix={} fanout={}",
|
||
stats.bytes_out_mix,
|
||
stats.bytes_out_fanout
|
||
);
|
||
}
|
||
|
||
/// Zero every stream except a single rotating speaker per
|
||
/// `window_frames`-long block. Assumes every stream in `streams` has the
|
||
/// same length and `FRAME_SIZE` divides it; the runtime calls this only
|
||
/// with post-truncate stream slices where both hold.
|
||
///
|
||
/// The rotation uses the block index modulo the participant count, so
|
||
/// each participant gets roughly `total_blocks / participants` turns
|
||
/// spread across the measurement window. At the default
|
||
/// `window_frames = 100` (2 s per turn at 20 ms frames) and
|
||
/// `MAX_FRAMES = 1500`, P=8 gives each participant ~1.9 turns — enough
|
||
/// for the phase breakdown to average over speaker transitions.
|
||
fn apply_dominant_speaker(streams: &mut [Vec<i32>], window_frames: usize) {
|
||
let n = streams.len();
|
||
if n == 0 {
|
||
return;
|
||
}
|
||
// AMI files aren't guaranteed to be identical length even after the
|
||
// common `truncate(max_samples)` step — a short recording stays
|
||
// short. Use the shortest stream as the rotation horizon so the
|
||
// per-stream slice below never runs past a stream's own end.
|
||
let common_len = streams.iter().map(|s| s.len()).min().unwrap_or(0);
|
||
let total_frames = common_len / FRAME_SIZE;
|
||
let window_samples = window_frames * FRAME_SIZE;
|
||
for block in 0..total_frames.div_ceil(window_frames) {
|
||
let active = block % n;
|
||
let start = block * window_samples;
|
||
let end = (start + window_samples).min(common_len);
|
||
for (i, s) in streams.iter_mut().enumerate() {
|
||
if i == active {
|
||
continue;
|
||
}
|
||
for sample in &mut s[start..end] {
|
||
*sample = 0;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn mcu_mix_1on1_voice() {
|
||
require!(corpus("ES2002a.Headset-0.wav"));
|
||
run(
|
||
"mcu_mix_1on1_voice",
|
||
&["ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav"],
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn mcu_mix_3people_voice() {
|
||
require!(corpus("ES2002a.Headset-0.wav"));
|
||
run(
|
||
"mcu_mix_3people_voice",
|
||
&[
|
||
"ES2002a.Headset-0.wav",
|
||
"ES2002a.Headset-1.wav",
|
||
"ES2002a.Headset-2.wav",
|
||
],
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn mcu_mix_5people_voice() {
|
||
require!(corpus("ES2002a.Headset-0.wav"));
|
||
run(
|
||
"mcu_mix_5people_voice",
|
||
&[
|
||
"ES2002a.Headset-0.wav",
|
||
"ES2002a.Headset-1.wav",
|
||
"ES2002a.Headset-2.wav",
|
||
"ES2002a.Headset-3.wav",
|
||
"ES2002a.Lapel-0.wav",
|
||
],
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn mcu_mix_8people_voice() {
|
||
// Covers the cross-over where mix bandwidth savings become dramatic —
|
||
// fanout grows quadratically, server-mix stays linear.
|
||
require!(corpus("ES2002a.Headset-0.wav"));
|
||
run(
|
||
"mcu_mix_8people_voice",
|
||
&[
|
||
"ES2002a.Headset-0.wav",
|
||
"ES2002a.Headset-1.wav",
|
||
"ES2002a.Headset-2.wav",
|
||
"ES2002a.Headset-3.wav",
|
||
"ES2002a.Lapel-0.wav",
|
||
"ES2002a.Lapel-1.wav",
|
||
"ES2002a.Lapel-2.wav",
|
||
"ES2002a.Lapel-3.wav",
|
||
],
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn mcu_mix_8people_dominant_speaker() {
|
||
// Same participant set as `mcu_mix_8people_voice`, but only one
|
||
// participant has audio at any given moment — rotating every
|
||
// 2 seconds (100 frames @ 20 ms each). This is the realistic
|
||
// meeting behaviour: one person talks while N-1 listen. The
|
||
// continuous-speech test above is the pessimistic workload
|
||
// (everyone talks simultaneously, which inflates MCU mix cost and
|
||
// SFU fanout byte count); this test shows what the server actually
|
||
// experiences in production traffic.
|
||
//
|
||
// Expected differences vs the continuous variant:
|
||
// - Decode phase: unchanged in wall-clock (silent frames decode
|
||
// about as cheap as voice frames — the codec still walks every
|
||
// residual).
|
||
// - Mix phase: still O(P²) additions per frame, unchanged.
|
||
// - Encode phase: drops sharply. Most mixes collapse to "one
|
||
// voice + silence + silence + …", which encodes roughly like
|
||
// a single voice stream instead of a P-1-way sum.
|
||
// - Egress ratio (MCU vs SFU): narrows. SFU forwards (P-1) silent
|
||
// streams per receiver nearly for free, so the SFU egress
|
||
// baseline drops faster than MCU egress does.
|
||
require!(corpus("ES2002a.Headset-0.wav"));
|
||
run_inner(
|
||
"mcu_mix_8people_dominant_speaker",
|
||
&[
|
||
"ES2002a.Headset-0.wav",
|
||
"ES2002a.Headset-1.wav",
|
||
"ES2002a.Headset-2.wav",
|
||
"ES2002a.Headset-3.wav",
|
||
"ES2002a.Lapel-0.wav",
|
||
"ES2002a.Lapel-1.wav",
|
||
"ES2002a.Lapel-2.wav",
|
||
"ES2002a.Lapel-3.wav",
|
||
],
|
||
Activity::DominantSpeaker { window_frames: 100 },
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn mcu_mix_16people_voice() {
|
||
// Past the typical full-mesh MCU sweet spot but still plausible for
|
||
// a mid-size meeting before hierarchical routing kicks in. The mix
|
||
// phase does 16 × 15 = 240 sample-wise additions per frame vs
|
||
// 8 × 7 = 56 at P=8, so the quadratic mix term starts showing up in
|
||
// the phase breakdown. Larger real meetings (P > 20) are typically
|
||
// routed via dominant-speaker selection rather than full-mesh mix,
|
||
// so this is a reasonable ceiling for the full-mesh data point.
|
||
require!(corpus("ES2002a.Headset-0.wav"));
|
||
run(
|
||
"mcu_mix_16people_voice",
|
||
&[
|
||
"ES2002a.Headset-0.wav",
|
||
"ES2002a.Headset-1.wav",
|
||
"ES2002a.Headset-2.wav",
|
||
"ES2002a.Headset-3.wav",
|
||
"ES2002a.Lapel-0.wav",
|
||
"ES2002a.Lapel-1.wav",
|
||
"ES2002a.Lapel-2.wav",
|
||
"ES2002a.Lapel-3.wav",
|
||
"ES2002a.Array1-01.wav",
|
||
"ES2002a.Array1-02.wav",
|
||
"ES2002a.Array1-03.wav",
|
||
"ES2002a.Array1-04.wav",
|
||
"ES2002a.Array1-05.wav",
|
||
"ES2002a.Array1-06.wav",
|
||
"ES2002a.Array1-07.wav",
|
||
"ES2002a.Array1-08.wav",
|
||
],
|
||
);
|
||
}
|