lac/tests/mcu_mix.rs
Kamal Tufekcic 7862cb1d9d
All checks were successful
CI / lint (push) Successful in 5s
CI / fuzz-regression (push) Successful in 14s
CI / build (push) Successful in 4s
CI / test (push) Successful in 6m54s
CI / publish (push) Successful in 8s
initial commit
Signed-off-by: Kamal Tufekcic <kamal@lo.sh>
2026-04-23 14:58:32 +03:00

488 lines
19 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! End-to-end MCU server-side mix workload, on real speech audio.
//!
//! An **MCU** (Multipoint Control Unit) is a conferencing server that
//! decodes every participant's incoming stream, mixes them in PCM, and
//! re-encodes a per-receiver output. Contrast with an **SFU** (Selective
//! Forwarding Unit) which forwards encoded streams byte-for-byte with no
//! decode — lower CPU, higher bandwidth, no mix.
//!
//! This test simulates the MCU hot loop (decode → mix → encode) and lets
//! us answer three concrete questions with real numbers:
//!
//! 1. **How many concurrent meetings can one CPU core handle?** Reported
//! as the realtime ratio — "1000 ms of audio processed in 20 ms wall
//! clock" means one core can handle 50 concurrent meetings at that
//! configuration.
//! 2. **What fraction of MCU time is actually encode?** Breaks the loop
//! into decode, mix, and encode phases so a later optimisation can
//! target the real bottleneck rather than the suspected one.
//! 3. **How much bandwidth does server-side mix save over pure SFU
//! fanout?** Compares the MCU's outgoing byte total (one stream per
//! receiver, each a leave-one-out mix) against what an SFU would send
//! (`P × (P 1)` stream copies across the meeting).
//!
//! The same test is the natural baseline for future codec work:
//!
//! - **Q15 coefficient-shift fix**: should reduce encoded byte totals on
//! bass-heavy content (music tests). Compare `bytes_out_mix` before and
//! after.
//! - **Encoder search optimisation**: should reduce `encode_ns` without
//! hurting `bytes_out_mix`. Compare the phase breakdown before and
//! after.
//!
//! Run with `cargo test --test mcu_mix --release -- --nocapture`.
use std::path::{Path, PathBuf};
use std::time::Instant;
use hound::WavReader;
use lac::{decode_frame, encode_frame};
const CORPUS_DIR: &str = "corpus";
/// 20 ms frame at 16 kHz — standard voice-chat frame length. Divides
/// cleanly by every partition order so the encoder search stays on the
/// dense path.
const FRAME_SIZE: usize = 320;
/// Cap wall-clock runtime by limiting how many frames we actually
/// process. At 20 ms/frame, 1500 frames is 30 s of audio per stream —
/// enough to average out transient behaviour without making the test take
/// minutes in CI.
const MAX_FRAMES: usize = 1500;
// ── WAV loading ─────────────────────────────────────────────────────────────
fn load_mono(path: &Path) -> Option<Vec<i32>> {
let mut reader = WavReader::open(path).ok()?;
let spec = reader.spec();
if spec.sample_format != hound::SampleFormat::Int
|| spec.channels != 1
|| spec.bits_per_sample > 24
{
return None;
}
let samples: Result<Vec<i32>, _> = reader.samples::<i32>().collect();
samples.ok()
}
fn corpus(name: &str) -> PathBuf {
Path::new(CORPUS_DIR).join(name)
}
macro_rules! require {
($path:expr) => {
if !$path.exists() {
eprintln!("skipping: corpus file not found: {}", $path.display());
return;
}
};
}
// ── MCU mix pipeline ────────────────────────────────────────────────────────
/// Result of one MCU simulation run.
#[derive(Default)]
struct McuStats {
/// Number of participants in the simulated meeting.
participants: usize,
/// Number of frames processed per participant (each frame is `FRAME_SIZE` samples).
frames: usize,
/// Total nanoseconds spent in `decode_frame` across all decodes.
decode_ns: u128,
/// Total nanoseconds spent doing the leave-one-out mix additions.
mix_ns: u128,
/// Total nanoseconds spent in `encode_frame` across all output encodes.
encode_ns: u128,
/// Bytes sent *into* the MCU by participants (pre-encoded frames).
bytes_in: usize,
/// Bytes sent *out* of the MCU (one stream per receiver, each a
/// leave-one-out mix).
bytes_out_mix: usize,
/// Bytes an SFU would send out for the same meeting: `P × (P 1)`
/// stream copies, since every participant receives every other
/// participant's stream verbatim with no mix.
bytes_out_fanout: usize,
}
impl McuStats {
fn total_ns(&self) -> u128 {
self.decode_ns + self.mix_ns + self.encode_ns
}
/// Audio duration processed, in milliseconds.
fn audio_ms(&self) -> f64 {
// FRAME_SIZE samples at 16 kHz = (FRAME_SIZE / 16000) seconds per frame.
(self.frames as f64) * (FRAME_SIZE as f64) / 16_000.0 * 1000.0
}
/// Realtime multiplier — audio ms per wall-clock ms. A value of 50
/// means this workload runs 50× faster than realtime, i.e., one core
/// handles 50 concurrent meetings of this configuration.
fn realtime_ratio(&self) -> f64 {
let wall_ms = (self.total_ns() as f64) / 1_000_000.0;
self.audio_ms() / wall_ms
}
}
/// Run the decode → mix → encode loop for a simulated MCU meeting with
/// the given pre-encoded participant streams. Returns timing and byte
/// accounting for a single run.
fn simulate_meeting(encoded_streams: &[Vec<Vec<u8>>]) -> McuStats {
let p = encoded_streams.len();
assert!(p >= 2, "need at least 2 participants for a meeting");
let n_frames = encoded_streams.iter().map(|s| s.len()).min().unwrap();
let n_frames = n_frames.min(MAX_FRAMES);
let mut stats = McuStats {
participants: p,
frames: n_frames,
..Default::default()
};
// Ingress bytes: each participant sends all its encoded frames to the MCU.
for s in encoded_streams {
stats.bytes_in += s[..n_frames].iter().map(|f| f.len()).sum::<usize>();
}
// Reusable scratch buffer for leave-one-out mixes. The mix for
// receiver `r` excludes participant `r`'s own voice so they don't
// hear themselves delayed.
let mut mix = vec![0i32; FRAME_SIZE];
// `frame_idx` is a tick index that selects the same position across
// every participant's encoded-frame vector. Converting this to
// `iter().zip(...)` across variable-arity participants complicates
// the loop body for no runtime benefit, so the integer-index form
// stays; silence the `needless_range_loop` lint explicitly.
#[allow(clippy::needless_range_loop)]
for frame_idx in 0..n_frames {
// ── Phase 1: decode every incoming stream for this frame tick ──
let t0 = Instant::now();
let decoded: Vec<Vec<i32>> = (0..p)
.map(|i| decode_frame(&encoded_streams[i][frame_idx]).expect("decode"))
.collect();
stats.decode_ns += t0.elapsed().as_nanos();
// ── Phase 2 + 3: for each receiver, build its leave-one-out mix and encode ──
for receiver in 0..p {
// Mix (i32 additions). Wrapping add is appropriate — i32
// sample range is ±2²³-1 and summing up to P-1 ≤ 31 streams
// keeps the running total within i32 (max 2^31-1 / 2^23 ≈ 256
// summands before saturation).
let t_mix = Instant::now();
mix.fill(0);
for (i, stream) in decoded.iter().enumerate() {
if i == receiver {
continue;
}
for (m, &s) in mix.iter_mut().zip(stream.iter()) {
*m = m.wrapping_add(s);
}
}
stats.mix_ns += t_mix.elapsed().as_nanos();
// Clamp the mix to the 24-bit input range LAC guarantees. In
// practice, summing 4 typical speech streams almost never
// exceeds the range (human voice peaks well under full-scale,
// and constructive superposition of uncorrelated speakers is
// rare), but clamp anyway to stay inside the codec contract.
for m in mix.iter_mut() {
*m = (*m).clamp(-(1 << 23) + 1, (1 << 23) - 1);
}
let t_enc = Instant::now();
let encoded = encode_frame(&mix);
stats.encode_ns += t_enc.elapsed().as_nanos();
stats.bytes_out_mix += encoded.len();
}
// Fanout byte accounting: for each frame, each participant's
// stream is copied to the P-1 other receivers, with no re-encode.
// That's P × (P-1) copies, each the original encoded frame size.
for stream in encoded_streams.iter() {
stats.bytes_out_fanout += stream[frame_idx].len() * (p - 1);
}
}
stats
}
// ── Reporting ───────────────────────────────────────────────────────────────
fn report(name: &str, s: &McuStats) {
let total_us = (s.total_ns() as f64) / 1000.0;
let decode_pct = 100.0 * (s.decode_ns as f64) / (s.total_ns() as f64);
let mix_pct = 100.0 * (s.mix_ns as f64) / (s.total_ns() as f64);
let encode_pct = 100.0 * (s.encode_ns as f64) / (s.total_ns() as f64);
let bandwidth_ratio = s.bytes_out_mix as f64 / s.bytes_out_fanout as f64;
eprintln!();
eprintln!("== {name} ==");
eprintln!(
" {} participants × {} frames = {:.1} s of audio per stream",
s.participants,
s.frames,
s.audio_ms() / 1000.0
);
eprintln!(
" wall: {:.1} ms ({:.1}× realtime → {:.0} concurrent meetings/core)",
total_us / 1000.0,
s.realtime_ratio(),
s.realtime_ratio(),
);
eprintln!(
" phase: decode {:>5.1}% mix {:>5.1}% encode {:>5.1}%",
decode_pct, mix_pct, encode_pct
);
eprintln!(
" bytes: in {} / out_mix {} / out_fanout {} / mix_vs_fanout {:.2}",
s.bytes_in, s.bytes_out_mix, s.bytes_out_fanout, bandwidth_ratio
);
}
// ── Tests ───────────────────────────────────────────────────────────────────
/// Pre-encode all participant streams into `FRAME_SIZE`-sample frames and
/// hand the result to `simulate_meeting`. Splitting the setup out of the
/// simulation keeps the reported `encode_ns` to the *server-side* encode
/// work only — participant-side encoding is a separate machine and
/// shouldn't contaminate the MCU measurement.
fn run(name: &str, stream_names: &[&str]) {
run_inner(name, stream_names, Activity::Continuous);
}
/// Turn-taking activity pattern. `Continuous` is every participant
/// producing audio every frame (the pessimistic load-bearing case for
/// MCU mix/encode cost). `DominantSpeaker { window_frames }` zeroes
/// every stream except a single rotating speaker for each
/// `window_frames`-long block. This is the realistic meeting behaviour
/// — one person talks at a time — under which the MCU compute cost
/// drops substantially (most mixes collapse to one voice + silence) but
/// the relative byte savings vs SFU shrink (silence forwards for
/// essentially free on a pure SFU, so the SFU egress number also falls).
enum Activity {
Continuous,
DominantSpeaker { window_frames: usize },
}
fn run_inner(name: &str, stream_names: &[&str], activity: Activity) {
let paths: Vec<PathBuf> = stream_names.iter().map(|n| corpus(n)).collect();
for p in &paths {
if !p.exists() {
eprintln!("skipping {name}: missing {}", p.display());
return;
}
}
// Cap each stream to the number of samples we'll actually simulate,
// so we don't spend time pre-encoding the remaining 30+ minutes of
// audio that `simulate_meeting` will never touch. The AMI files are
// multi-minute recordings; MAX_FRAMES = 1500 @ 20 ms/frame = 30 s.
let max_samples = MAX_FRAMES * FRAME_SIZE;
let mut streams: Vec<Vec<i32>> = paths
.iter()
.map(|p| {
let mut s = load_mono(p).expect("load_mono");
s.truncate(max_samples);
s
})
.collect();
if let Activity::DominantSpeaker { window_frames } = activity {
apply_dominant_speaker(&mut streams, window_frames);
}
let encoded_streams: Vec<Vec<Vec<u8>>> = streams
.iter()
.map(|s| s.chunks(FRAME_SIZE).map(encode_frame).collect())
.collect();
let stats = simulate_meeting(&encoded_streams);
report(name, &stats);
// Sanity assertion: MCU egress must not exceed SFU egress. Equality
// holds at P=2 (the leave-one-out mix is just the other participant's
// voice, which encodes to the same size as forwarding their original
// stream); strict inequality holds for P ≥ 3 because fanout's byte
// count grows as `P × (P 1)` while mix grows as `P`.
//
// For dominant-speaker the invariant still holds on the egress side
// (MCU produces one stream per receiver; SFU forwards P-1 streams
// per receiver) but the absolute margin narrows because silence
// compresses to ~1 bit per sample and SFU's egress of N-1 silent
// streams is essentially free.
assert!(
stats.bytes_out_mix <= stats.bytes_out_fanout,
"MCU mix produced more bytes than SFU fanout: mix={} fanout={}",
stats.bytes_out_mix,
stats.bytes_out_fanout
);
}
/// Zero every stream except a single rotating speaker per
/// `window_frames`-long block. Assumes every stream in `streams` has the
/// same length and `FRAME_SIZE` divides it; the runtime calls this only
/// with post-truncate stream slices where both hold.
///
/// The rotation uses the block index modulo the participant count, so
/// each participant gets roughly `total_blocks / participants` turns
/// spread across the measurement window. At the default
/// `window_frames = 100` (2 s per turn at 20 ms frames) and
/// `MAX_FRAMES = 1500`, P=8 gives each participant ~1.9 turns — enough
/// for the phase breakdown to average over speaker transitions.
fn apply_dominant_speaker(streams: &mut [Vec<i32>], window_frames: usize) {
let n = streams.len();
if n == 0 {
return;
}
// AMI files aren't guaranteed to be identical length even after the
// common `truncate(max_samples)` step — a short recording stays
// short. Use the shortest stream as the rotation horizon so the
// per-stream slice below never runs past a stream's own end.
let common_len = streams.iter().map(|s| s.len()).min().unwrap_or(0);
let total_frames = common_len / FRAME_SIZE;
let window_samples = window_frames * FRAME_SIZE;
for block in 0..total_frames.div_ceil(window_frames) {
let active = block % n;
let start = block * window_samples;
let end = (start + window_samples).min(common_len);
for (i, s) in streams.iter_mut().enumerate() {
if i == active {
continue;
}
for sample in &mut s[start..end] {
*sample = 0;
}
}
}
}
#[test]
fn mcu_mix_1on1_voice() {
require!(corpus("ES2002a.Headset-0.wav"));
run(
"mcu_mix_1on1_voice",
&["ES2002a.Headset-0.wav", "ES2002a.Headset-1.wav"],
);
}
#[test]
fn mcu_mix_3people_voice() {
require!(corpus("ES2002a.Headset-0.wav"));
run(
"mcu_mix_3people_voice",
&[
"ES2002a.Headset-0.wav",
"ES2002a.Headset-1.wav",
"ES2002a.Headset-2.wav",
],
);
}
#[test]
fn mcu_mix_5people_voice() {
require!(corpus("ES2002a.Headset-0.wav"));
run(
"mcu_mix_5people_voice",
&[
"ES2002a.Headset-0.wav",
"ES2002a.Headset-1.wav",
"ES2002a.Headset-2.wav",
"ES2002a.Headset-3.wav",
"ES2002a.Lapel-0.wav",
],
);
}
#[test]
fn mcu_mix_8people_voice() {
// Covers the cross-over where mix bandwidth savings become dramatic —
// fanout grows quadratically, server-mix stays linear.
require!(corpus("ES2002a.Headset-0.wav"));
run(
"mcu_mix_8people_voice",
&[
"ES2002a.Headset-0.wav",
"ES2002a.Headset-1.wav",
"ES2002a.Headset-2.wav",
"ES2002a.Headset-3.wav",
"ES2002a.Lapel-0.wav",
"ES2002a.Lapel-1.wav",
"ES2002a.Lapel-2.wav",
"ES2002a.Lapel-3.wav",
],
);
}
#[test]
fn mcu_mix_8people_dominant_speaker() {
// Same participant set as `mcu_mix_8people_voice`, but only one
// participant has audio at any given moment — rotating every
// 2 seconds (100 frames @ 20 ms each). This is the realistic
// meeting behaviour: one person talks while N-1 listen. The
// continuous-speech test above is the pessimistic workload
// (everyone talks simultaneously, which inflates MCU mix cost and
// SFU fanout byte count); this test shows what the server actually
// experiences in production traffic.
//
// Expected differences vs the continuous variant:
// - Decode phase: unchanged in wall-clock (silent frames decode
// about as cheap as voice frames — the codec still walks every
// residual).
// - Mix phase: still O(P²) additions per frame, unchanged.
// - Encode phase: drops sharply. Most mixes collapse to "one
// voice + silence + silence + …", which encodes roughly like
// a single voice stream instead of a P-1-way sum.
// - Egress ratio (MCU vs SFU): narrows. SFU forwards (P-1) silent
// streams per receiver nearly for free, so the SFU egress
// baseline drops faster than MCU egress does.
require!(corpus("ES2002a.Headset-0.wav"));
run_inner(
"mcu_mix_8people_dominant_speaker",
&[
"ES2002a.Headset-0.wav",
"ES2002a.Headset-1.wav",
"ES2002a.Headset-2.wav",
"ES2002a.Headset-3.wav",
"ES2002a.Lapel-0.wav",
"ES2002a.Lapel-1.wav",
"ES2002a.Lapel-2.wav",
"ES2002a.Lapel-3.wav",
],
Activity::DominantSpeaker { window_frames: 100 },
);
}
#[test]
fn mcu_mix_16people_voice() {
// Past the typical full-mesh MCU sweet spot but still plausible for
// a mid-size meeting before hierarchical routing kicks in. The mix
// phase does 16 × 15 = 240 sample-wise additions per frame vs
// 8 × 7 = 56 at P=8, so the quadratic mix term starts showing up in
// the phase breakdown. Larger real meetings (P > 20) are typically
// routed via dominant-speaker selection rather than full-mesh mix,
// so this is a reasonable ceiling for the full-mesh data point.
require!(corpus("ES2002a.Headset-0.wav"));
run(
"mcu_mix_16people_voice",
&[
"ES2002a.Headset-0.wav",
"ES2002a.Headset-1.wav",
"ES2002a.Headset-2.wav",
"ES2002a.Headset-3.wav",
"ES2002a.Lapel-0.wav",
"ES2002a.Lapel-1.wav",
"ES2002a.Lapel-2.wav",
"ES2002a.Lapel-3.wav",
"ES2002a.Array1-01.wav",
"ES2002a.Array1-02.wav",
"ES2002a.Array1-03.wav",
"ES2002a.Array1-04.wav",
"ES2002a.Array1-05.wav",
"ES2002a.Array1-06.wav",
"ES2002a.Array1-07.wav",
"ES2002a.Array1-08.wav",
],
);
}