//! Per-frame latency distribution and heap footprint on real audio. //! //! Answers three questions the aggregate benchmark leaves fuzzy: //! //! 1. **What's the tail of encode latency?** The mean in a bench says //! nothing about how bad the worst frames are. A realtime system needs //! P99 below the frame period; otherwise one slow frame blows the //! entire deadline. We report P50/P95/P99/max on real speech and music. //! 2. **What's decode speed in isolation?** The MCU test bundles //! decode+mix+encode; this test measures decode alone so we know how //! cheap the receive path actually is. //! 3. **Peak heap per frame?** Important if LAC is embedded alongside a //! heavier codec (LVC video) — we want to know how much transient //! allocation each audio frame costs. We wrap the global allocator //! with a simple counter for the duration of the test. //! //! Run with `cargo test --test latency --release -- --nocapture`. //! Tests serialise themselves via a process-wide mutex so the //! tracking-allocator counters stay coherent even under `cargo test`'s //! default multi-threaded runner. `--test-threads=1` is no longer //! required for correctness but still recommended for clean, //! in-order console output. //! //! # Measurement stability //! //! For stable P99 numbers, pin the harness to a fixed core and disable //! frequency scaling before running: //! //! ```text //! sudo cpupower frequency-set -g performance //! taskset -c 0 cargo test --test latency --release -- --nocapture //! ``` //! //! On a noisy CI runner the P99 values include scheduler jitter and can //! overstate real-world cost by 2-5×. The P99 hard-deadline asserts //! below use the frame period as the ceiling, which is still a wide //! safety margin (~40× headroom in steady state) so jitter alone won't //! flake the suite. use std::alloc::{GlobalAlloc, Layout, System}; use std::path::{Path, PathBuf}; use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use hound::WavReader; use lac::{decode_frame, encode_frame}; const CORPUS_DIR: &str = "corpus"; // ── Tracking allocator ────────────────────────────────────────────────────── /// Global allocator wrapper tracking current and peak bytes outstanding. /// Counts are process-global; the per-test `_lock(&TEST_MUTEX)` guard /// (`MEASUREMENT_LOCK.lock()` at the top of each test body) serialises /// access so concurrent test threads don't corrupt each other's /// measurements. struct TrackingAllocator; /// Serialises latency tests so the process-global tracking-allocator /// counters stay coherent under multi-threaded `cargo test`. Each test /// takes the lock at entry and holds it for the whole measurement /// window. This is only about allocator-counter coherence; a panic /// inside a test section will still release the mutex via unwind, so /// the `PoisonError` path intentionally ignores poison. static MEASUREMENT_LOCK: Mutex<()> = Mutex::new(()); /// Cumulative bytes currently allocated from the tracked allocator. Updated /// on every alloc/dealloc; reset to 0 between measurements via /// `reset_peak`. static CURRENT_BYTES: AtomicUsize = AtomicUsize::new(0); /// Peak of `CURRENT_BYTES` observed since the last `reset_peak`. static PEAK_BYTES: AtomicUsize = AtomicUsize::new(0); /// Cumulative count of `alloc` calls since the last `reset_peak`. Tracked /// separately from bytes because a regression can keep peak-bytes flat /// (same sized buffers, different provenance) while multiplying the call /// count — e.g. a refactor that replaces one reused `Vec` with a fresh /// `Vec::new()` per frame. static CALL_COUNT: AtomicUsize = AtomicUsize::new(0); unsafe impl GlobalAlloc for TrackingAllocator { unsafe fn alloc(&self, layout: Layout) -> *mut u8 { let ptr = unsafe { System.alloc(layout) }; if !ptr.is_null() { let new = CURRENT_BYTES.fetch_add(layout.size(), Ordering::Relaxed) + layout.size(); // `fetch_max` updates the peak only if `new` exceeds the stored // value; cheap, wait-free, sufficient for single-threaded tests. PEAK_BYTES.fetch_max(new, Ordering::Relaxed); CALL_COUNT.fetch_add(1, Ordering::Relaxed); } ptr } unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { unsafe { System.dealloc(ptr, layout) }; CURRENT_BYTES.fetch_sub(layout.size(), Ordering::Relaxed); } } #[global_allocator] static ALLOC: TrackingAllocator = TrackingAllocator; /// Reset the peak to the current allocation level so the next measurement /// window starts fresh. Also clears the per-window allocation counter. /// Call once before the code under test. fn reset_peak() { PEAK_BYTES.store(CURRENT_BYTES.load(Ordering::Relaxed), Ordering::Relaxed); CALL_COUNT.store(0, Ordering::Relaxed); } /// Read the maximum bytes outstanding since the last `reset_peak`, minus /// the current baseline — i.e., the peak *transient* heap the code under /// test held. Ignores allocations that were still live at reset time /// (test scaffolding, pre-loaded corpus). fn peak_delta_since_reset() -> usize { let peak = PEAK_BYTES.load(Ordering::Relaxed); let baseline = CURRENT_BYTES.load(Ordering::Relaxed); peak.saturating_sub(baseline) } /// Read the number of `alloc` calls made since the last `reset_peak`. fn call_count_since_reset() -> usize { CALL_COUNT.load(Ordering::Relaxed) } // ── Corpus loading ────────────────────────────────────────────────────────── fn corpus(name: &str) -> PathBuf { Path::new(CORPUS_DIR).join(name) } fn load_mono(path: &Path) -> Option> { let mut reader = WavReader::open(path).ok()?; let spec = reader.spec(); if spec.sample_format != hound::SampleFormat::Int || spec.channels != 1 || spec.bits_per_sample > 24 { return None; } let samples: Result, _> = reader.samples::().collect(); samples.ok() } macro_rules! require { ($path:expr) => { if !$path.exists() { eprintln!("skipping: corpus file not found: {}", $path.display()); return; } }; } // ── Latency harness ───────────────────────────────────────────────────────── /// Distribution summary computed from a sorted `Vec`. struct Dist { count: usize, p50: Duration, p95: Duration, p99: Duration, max: Duration, mean: Duration, } fn dist_of(mut samples: Vec) -> Dist { samples.sort(); let count = samples.len(); let p = |frac: f64| samples[((count as f64 - 1.0) * frac).round() as usize]; let total: Duration = samples.iter().sum(); Dist { count, p50: p(0.50), p95: p(0.95), p99: p(0.99), max: *samples.last().unwrap(), mean: total / count as u32, } } fn fmt_us(d: Duration) -> String { format!("{:.1}µs", d.as_nanos() as f64 / 1000.0) } /// Measure encode and decode latency distributions over every /// `frame_size`-sample chunk in `samples`. Also reports peak per-frame /// heap allocation (encoder side only — the decoder footprint is /// measured separately in the per-phase breakdown below). /// /// `sample_rate` is used to convert the frame-sample count into a /// real-time frame period, which gates the P99 assertion below. fn run_latency(name: &str, samples: &[i32], frame_size: usize, sample_rate: u32) { // Serialise against every other latency-test thread: the tracking // allocator is a single pair of atomic counters. The // `.unwrap_or_else` handles `PoisonError` so one failed test can't // wedge the rest of the suite. let _guard = MEASUREMENT_LOCK .lock() .unwrap_or_else(|poisoned| poisoned.into_inner()); let frames: Vec<&[i32]> = samples.chunks_exact(frame_size).collect(); // Warm-up: first few encodes allocate LLVM+allocator arenas that // don't reflect steady-state behaviour. Skip them in the measurement. let warmup = 32.min(frames.len()); for f in &frames[..warmup] { std::hint::black_box(encode_frame(f)); } // ── Encode latency and transient heap ──────────────────────────── let mut encode_times = Vec::with_capacity(frames.len() - warmup); reset_peak(); let mut peak_encode_bytes = 0usize; let mut peak_encode_allocs = 0usize; let mut encoded_bytes_total = 0usize; let mut encoded_frames: Vec> = Vec::with_capacity(frames.len() - warmup); for f in &frames[warmup..] { reset_peak(); let t = Instant::now(); let encoded = encode_frame(f); encode_times.push(t.elapsed()); encoded_bytes_total += encoded.len(); peak_encode_bytes = peak_encode_bytes.max(peak_delta_since_reset()); peak_encode_allocs = peak_encode_allocs.max(call_count_since_reset()); encoded_frames.push(encoded); } // ── Decode latency and transient heap ──────────────────────────── let mut decode_times = Vec::with_capacity(encoded_frames.len()); let mut peak_decode_bytes = 0usize; let mut peak_decode_allocs = 0usize; for ef in &encoded_frames { reset_peak(); let t = Instant::now(); let _samples = decode_frame(ef).expect("decode"); decode_times.push(t.elapsed()); peak_decode_bytes = peak_decode_bytes.max(peak_delta_since_reset()); peak_decode_allocs = peak_decode_allocs.max(call_count_since_reset()); } let enc = dist_of(encode_times); let dec = dist_of(decode_times); // Frame period = frame_size / sample_rate, expressed in nanoseconds // as an integer so the subsequent P99 comparison is exact (no float // epsilon). Example: 320 samples at 16 kHz → 20_000_000 ns = 20 ms. let frame_period = Duration::from_nanos((frame_size as u64 * 1_000_000_000) / sample_rate as u64); eprintln!(); eprintln!("== {name} ({frame_size}-sample frames @ {sample_rate} Hz) =="); eprintln!( " encode latency: p50={} p95={} p99={} max={} mean={}", fmt_us(enc.p50), fmt_us(enc.p95), fmt_us(enc.p99), fmt_us(enc.max), fmt_us(enc.mean) ); eprintln!( " headroom at p99 vs frame period ({:.1}ms): {:.1}×", frame_period.as_micros() as f64 / 1000.0, frame_period.as_nanos() as f64 / enc.p99.as_nanos() as f64, ); eprintln!( " decode latency: p50={} p95={} p99={} max={} mean={}", fmt_us(dec.p50), fmt_us(dec.p95), fmt_us(dec.p99), fmt_us(dec.max), fmt_us(dec.mean) ); eprintln!( " peak heap / frame: encode={}B decode={}B", peak_encode_bytes, peak_decode_bytes ); eprintln!( " peak allocs / frame: encode={} decode={}", peak_encode_allocs, peak_decode_allocs ); eprintln!( " throughput: encoded_frames={} total_encoded_bytes={} avg_bytes/frame={:.1}", enc.count, encoded_bytes_total, encoded_bytes_total as f64 / enc.count as f64 ); // Real-time invariant: P99 per-frame cost must stay below the frame // period. Steady-state headroom is ~40× so a CI runner with heavy // scheduler jitter still passes comfortably; a 40× regression // (encoder bug, allocator hot-path change) trips this assert. assert!( enc.p99 < frame_period, "encode P99 {} exceeds frame period {} — real-time deadline missed", fmt_us(enc.p99), fmt_us(frame_period), ); assert!( dec.p99 < frame_period, "decode P99 {} exceeds frame period {} — real-time deadline missed", fmt_us(dec.p99), fmt_us(frame_period), ); } // ── Tests ─────────────────────────────────────────────────────────────────── #[test] fn latency_headset_speech_320() { // 320 samples @ 16 kHz = 20 ms frame — standard voice-chat period. let path = corpus("ES2002a.Headset-0.wav"); require!(path); let samples = load_mono(&path).expect("load"); // Cap to ~60 s of audio so the test doesn't dominate CI time. let cap = (16_000 * 60).min(samples.len()); run_latency("headset_speech", &samples[..cap], 320, 16_000); } #[test] fn latency_headset_speech_160() { // 160 samples @ 16 kHz = 10 ms frame — tighter latency mode used by // WebRTC and similar real-time systems. let path = corpus("ES2002a.Headset-0.wav"); require!(path); let samples = load_mono(&path).expect("load"); let cap = (16_000 * 60).min(samples.len()); run_latency("headset_speech_10ms", &samples[..cap], 160, 16_000); } #[test] fn latency_headset_speech_480() { // 480 samples at 16 kHz = 30 ms frame. The same sample count at // 48 kHz is WebRTC's 10 ms full-band frame; since the codec only // cares about frame sample count (not sample rate) this exercises // the same search-grid shape that a 48 kHz WebRTC deployment would // hit. 480 = 2^5 · 3 · 5, so partition orders 0..=5 are valid; 6 // and 7 are not, which differs from the 2048-sample dense case. let path = corpus("ES2002a.Headset-0.wav"); require!(path); let samples = load_mono(&path).expect("load"); let cap = (16_000 * 60).min(samples.len()); run_latency("headset_speech_480", &samples[..cap], 480, 16_000); } #[test] fn latency_headset_speech_prime() { // 503 is prime, so only `partition_order = 0` divides it — the // encoder skips the partition search entirely and emits a single // Rice partition. Covers a code path that power-of-two and // smooth-composite frame sizes never reach. let path = corpus("ES2002a.Headset-0.wav"); require!(path); let samples = load_mono(&path).expect("load"); let cap = (16_000 * 60).min(samples.len()); run_latency("headset_speech_prime503", &samples[..cap], 503, 16_000); } #[test] fn latency_mixed_meeting_320() { let path = corpus("ES2002a.Mix-Headset.wav"); require!(path); let samples = load_mono(&path).expect("load"); let cap = (16_000 * 60).min(samples.len()); run_latency("mixed_meeting", &samples[..cap], 320, 16_000); } #[test] fn latency_array_speech_320() { // Distant mic — residuals are noisier, so encode cost per frame // typically rises. Useful to confirm P99 doesn't blow up on less // predictable content. let path = corpus("ES2002a.Array1-01.wav"); require!(path); let samples = load_mono(&path).expect("load"); let cap = (16_000 * 60).min(samples.len()); run_latency("array_speech", &samples[..cap], 320, 16_000); }