376 lines
15 KiB
Rust
376 lines
15 KiB
Rust
//! Per-frame latency distribution and heap footprint on real audio.
|
||
//!
|
||
//! Answers three questions the aggregate benchmark leaves fuzzy:
|
||
//!
|
||
//! 1. **What's the tail of encode latency?** The mean in a bench says
|
||
//! nothing about how bad the worst frames are. A realtime system needs
|
||
//! P99 below the frame period; otherwise one slow frame blows the
|
||
//! entire deadline. We report P50/P95/P99/max on real speech and music.
|
||
//! 2. **What's decode speed in isolation?** The MCU test bundles
|
||
//! decode+mix+encode; this test measures decode alone so we know how
|
||
//! cheap the receive path actually is.
|
||
//! 3. **Peak heap per frame?** Important if LAC is embedded alongside a
|
||
//! heavier codec (LVC video) — we want to know how much transient
|
||
//! allocation each audio frame costs. We wrap the global allocator
|
||
//! with a simple counter for the duration of the test.
|
||
//!
|
||
//! Run with `cargo test --test latency --release -- --nocapture`.
|
||
//! Tests serialise themselves via a process-wide mutex so the
|
||
//! tracking-allocator counters stay coherent even under `cargo test`'s
|
||
//! default multi-threaded runner. `--test-threads=1` is no longer
|
||
//! required for correctness but still recommended for clean,
|
||
//! in-order console output.
|
||
//!
|
||
//! # Measurement stability
|
||
//!
|
||
//! For stable P99 numbers, pin the harness to a fixed core and disable
|
||
//! frequency scaling before running:
|
||
//!
|
||
//! ```text
|
||
//! sudo cpupower frequency-set -g performance
|
||
//! taskset -c 0 cargo test --test latency --release -- --nocapture
|
||
//! ```
|
||
//!
|
||
//! On a noisy CI runner the P99 values include scheduler jitter and can
|
||
//! overstate real-world cost by 2-5×. The P99 hard-deadline asserts
|
||
//! below use the frame period as the ceiling, which is still a wide
|
||
//! safety margin (~40× headroom in steady state) so jitter alone won't
|
||
//! flake the suite.
|
||
|
||
use std::alloc::{GlobalAlloc, Layout, System};
|
||
use std::path::{Path, PathBuf};
|
||
use std::sync::Mutex;
|
||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||
use std::time::{Duration, Instant};
|
||
|
||
use hound::WavReader;
|
||
use lac::{decode_frame, encode_frame};
|
||
|
||
const CORPUS_DIR: &str = "corpus";
|
||
|
||
// ── Tracking allocator ──────────────────────────────────────────────────────
|
||
|
||
/// Global allocator wrapper tracking current and peak bytes outstanding.
|
||
/// Counts are process-global; the per-test `_lock(&TEST_MUTEX)` guard
|
||
/// (`MEASUREMENT_LOCK.lock()` at the top of each test body) serialises
|
||
/// access so concurrent test threads don't corrupt each other's
|
||
/// measurements.
|
||
struct TrackingAllocator;
|
||
|
||
/// Serialises latency tests so the process-global tracking-allocator
|
||
/// counters stay coherent under multi-threaded `cargo test`. Each test
|
||
/// takes the lock at entry and holds it for the whole measurement
|
||
/// window. This is only about allocator-counter coherence; a panic
|
||
/// inside a test section will still release the mutex via unwind, so
|
||
/// the `PoisonError` path intentionally ignores poison.
|
||
static MEASUREMENT_LOCK: Mutex<()> = Mutex::new(());
|
||
|
||
/// Cumulative bytes currently allocated from the tracked allocator. Updated
|
||
/// on every alloc/dealloc; reset to 0 between measurements via
|
||
/// `reset_peak`.
|
||
static CURRENT_BYTES: AtomicUsize = AtomicUsize::new(0);
|
||
|
||
/// Peak of `CURRENT_BYTES` observed since the last `reset_peak`.
|
||
static PEAK_BYTES: AtomicUsize = AtomicUsize::new(0);
|
||
|
||
/// Cumulative count of `alloc` calls since the last `reset_peak`. Tracked
|
||
/// separately from bytes because a regression can keep peak-bytes flat
|
||
/// (same sized buffers, different provenance) while multiplying the call
|
||
/// count — e.g. a refactor that replaces one reused `Vec` with a fresh
|
||
/// `Vec::new()` per frame.
|
||
static CALL_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||
|
||
unsafe impl GlobalAlloc for TrackingAllocator {
|
||
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
|
||
let ptr = unsafe { System.alloc(layout) };
|
||
if !ptr.is_null() {
|
||
let new = CURRENT_BYTES.fetch_add(layout.size(), Ordering::Relaxed) + layout.size();
|
||
// `fetch_max` updates the peak only if `new` exceeds the stored
|
||
// value; cheap, wait-free, sufficient for single-threaded tests.
|
||
PEAK_BYTES.fetch_max(new, Ordering::Relaxed);
|
||
CALL_COUNT.fetch_add(1, Ordering::Relaxed);
|
||
}
|
||
ptr
|
||
}
|
||
|
||
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
|
||
unsafe { System.dealloc(ptr, layout) };
|
||
CURRENT_BYTES.fetch_sub(layout.size(), Ordering::Relaxed);
|
||
}
|
||
}
|
||
|
||
#[global_allocator]
|
||
static ALLOC: TrackingAllocator = TrackingAllocator;
|
||
|
||
/// Reset the peak to the current allocation level so the next measurement
|
||
/// window starts fresh. Also clears the per-window allocation counter.
|
||
/// Call once before the code under test.
|
||
fn reset_peak() {
|
||
PEAK_BYTES.store(CURRENT_BYTES.load(Ordering::Relaxed), Ordering::Relaxed);
|
||
CALL_COUNT.store(0, Ordering::Relaxed);
|
||
}
|
||
|
||
/// Read the maximum bytes outstanding since the last `reset_peak`, minus
|
||
/// the current baseline — i.e., the peak *transient* heap the code under
|
||
/// test held. Ignores allocations that were still live at reset time
|
||
/// (test scaffolding, pre-loaded corpus).
|
||
fn peak_delta_since_reset() -> usize {
|
||
let peak = PEAK_BYTES.load(Ordering::Relaxed);
|
||
let baseline = CURRENT_BYTES.load(Ordering::Relaxed);
|
||
peak.saturating_sub(baseline)
|
||
}
|
||
|
||
/// Read the number of `alloc` calls made since the last `reset_peak`.
|
||
fn call_count_since_reset() -> usize {
|
||
CALL_COUNT.load(Ordering::Relaxed)
|
||
}
|
||
|
||
// ── Corpus loading ──────────────────────────────────────────────────────────
|
||
|
||
fn corpus(name: &str) -> PathBuf {
|
||
Path::new(CORPUS_DIR).join(name)
|
||
}
|
||
|
||
fn load_mono(path: &Path) -> Option<Vec<i32>> {
|
||
let mut reader = WavReader::open(path).ok()?;
|
||
let spec = reader.spec();
|
||
if spec.sample_format != hound::SampleFormat::Int
|
||
|| spec.channels != 1
|
||
|| spec.bits_per_sample > 24
|
||
{
|
||
return None;
|
||
}
|
||
let samples: Result<Vec<i32>, _> = reader.samples::<i32>().collect();
|
||
samples.ok()
|
||
}
|
||
|
||
macro_rules! require {
|
||
($path:expr) => {
|
||
if !$path.exists() {
|
||
eprintln!("skipping: corpus file not found: {}", $path.display());
|
||
return;
|
||
}
|
||
};
|
||
}
|
||
|
||
// ── Latency harness ─────────────────────────────────────────────────────────
|
||
|
||
/// Distribution summary computed from a sorted `Vec<Duration>`.
|
||
struct Dist {
|
||
count: usize,
|
||
p50: Duration,
|
||
p95: Duration,
|
||
p99: Duration,
|
||
max: Duration,
|
||
mean: Duration,
|
||
}
|
||
|
||
fn dist_of(mut samples: Vec<Duration>) -> Dist {
|
||
samples.sort();
|
||
let count = samples.len();
|
||
let p = |frac: f64| samples[((count as f64 - 1.0) * frac).round() as usize];
|
||
let total: Duration = samples.iter().sum();
|
||
Dist {
|
||
count,
|
||
p50: p(0.50),
|
||
p95: p(0.95),
|
||
p99: p(0.99),
|
||
max: *samples.last().unwrap(),
|
||
mean: total / count as u32,
|
||
}
|
||
}
|
||
|
||
fn fmt_us(d: Duration) -> String {
|
||
format!("{:.1}µs", d.as_nanos() as f64 / 1000.0)
|
||
}
|
||
|
||
/// Measure encode and decode latency distributions over every
|
||
/// `frame_size`-sample chunk in `samples`. Also reports peak per-frame
|
||
/// heap allocation (encoder side only — the decoder footprint is
|
||
/// measured separately in the per-phase breakdown below).
|
||
///
|
||
/// `sample_rate` is used to convert the frame-sample count into a
|
||
/// real-time frame period, which gates the P99 assertion below.
|
||
fn run_latency(name: &str, samples: &[i32], frame_size: usize, sample_rate: u32) {
|
||
// Serialise against every other latency-test thread: the tracking
|
||
// allocator is a single pair of atomic counters. The
|
||
// `.unwrap_or_else` handles `PoisonError` so one failed test can't
|
||
// wedge the rest of the suite.
|
||
let _guard = MEASUREMENT_LOCK
|
||
.lock()
|
||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||
let frames: Vec<&[i32]> = samples.chunks_exact(frame_size).collect();
|
||
// Warm-up: first few encodes allocate LLVM+allocator arenas that
|
||
// don't reflect steady-state behaviour. Skip them in the measurement.
|
||
let warmup = 32.min(frames.len());
|
||
for f in &frames[..warmup] {
|
||
std::hint::black_box(encode_frame(f));
|
||
}
|
||
|
||
// ── Encode latency and transient heap ────────────────────────────
|
||
let mut encode_times = Vec::with_capacity(frames.len() - warmup);
|
||
reset_peak();
|
||
let mut peak_encode_bytes = 0usize;
|
||
let mut peak_encode_allocs = 0usize;
|
||
let mut encoded_bytes_total = 0usize;
|
||
let mut encoded_frames: Vec<Vec<u8>> = Vec::with_capacity(frames.len() - warmup);
|
||
for f in &frames[warmup..] {
|
||
reset_peak();
|
||
let t = Instant::now();
|
||
let encoded = encode_frame(f);
|
||
encode_times.push(t.elapsed());
|
||
encoded_bytes_total += encoded.len();
|
||
peak_encode_bytes = peak_encode_bytes.max(peak_delta_since_reset());
|
||
peak_encode_allocs = peak_encode_allocs.max(call_count_since_reset());
|
||
encoded_frames.push(encoded);
|
||
}
|
||
|
||
// ── Decode latency and transient heap ────────────────────────────
|
||
let mut decode_times = Vec::with_capacity(encoded_frames.len());
|
||
let mut peak_decode_bytes = 0usize;
|
||
let mut peak_decode_allocs = 0usize;
|
||
for ef in &encoded_frames {
|
||
reset_peak();
|
||
let t = Instant::now();
|
||
let _samples = decode_frame(ef).expect("decode");
|
||
decode_times.push(t.elapsed());
|
||
peak_decode_bytes = peak_decode_bytes.max(peak_delta_since_reset());
|
||
peak_decode_allocs = peak_decode_allocs.max(call_count_since_reset());
|
||
}
|
||
|
||
let enc = dist_of(encode_times);
|
||
let dec = dist_of(decode_times);
|
||
// Frame period = frame_size / sample_rate, expressed in nanoseconds
|
||
// as an integer so the subsequent P99 comparison is exact (no float
|
||
// epsilon). Example: 320 samples at 16 kHz → 20_000_000 ns = 20 ms.
|
||
let frame_period =
|
||
Duration::from_nanos((frame_size as u64 * 1_000_000_000) / sample_rate as u64);
|
||
|
||
eprintln!();
|
||
eprintln!("== {name} ({frame_size}-sample frames @ {sample_rate} Hz) ==");
|
||
eprintln!(
|
||
" encode latency: p50={} p95={} p99={} max={} mean={}",
|
||
fmt_us(enc.p50),
|
||
fmt_us(enc.p95),
|
||
fmt_us(enc.p99),
|
||
fmt_us(enc.max),
|
||
fmt_us(enc.mean)
|
||
);
|
||
eprintln!(
|
||
" headroom at p99 vs frame period ({:.1}ms): {:.1}×",
|
||
frame_period.as_micros() as f64 / 1000.0,
|
||
frame_period.as_nanos() as f64 / enc.p99.as_nanos() as f64,
|
||
);
|
||
eprintln!(
|
||
" decode latency: p50={} p95={} p99={} max={} mean={}",
|
||
fmt_us(dec.p50),
|
||
fmt_us(dec.p95),
|
||
fmt_us(dec.p99),
|
||
fmt_us(dec.max),
|
||
fmt_us(dec.mean)
|
||
);
|
||
eprintln!(
|
||
" peak heap / frame: encode={}B decode={}B",
|
||
peak_encode_bytes, peak_decode_bytes
|
||
);
|
||
eprintln!(
|
||
" peak allocs / frame: encode={} decode={}",
|
||
peak_encode_allocs, peak_decode_allocs
|
||
);
|
||
eprintln!(
|
||
" throughput: encoded_frames={} total_encoded_bytes={} avg_bytes/frame={:.1}",
|
||
enc.count,
|
||
encoded_bytes_total,
|
||
encoded_bytes_total as f64 / enc.count as f64
|
||
);
|
||
|
||
// Real-time invariant: P99 per-frame cost must stay below the frame
|
||
// period. Steady-state headroom is ~40× so a CI runner with heavy
|
||
// scheduler jitter still passes comfortably; a 40× regression
|
||
// (encoder bug, allocator hot-path change) trips this assert.
|
||
assert!(
|
||
enc.p99 < frame_period,
|
||
"encode P99 {} exceeds frame period {} — real-time deadline missed",
|
||
fmt_us(enc.p99),
|
||
fmt_us(frame_period),
|
||
);
|
||
assert!(
|
||
dec.p99 < frame_period,
|
||
"decode P99 {} exceeds frame period {} — real-time deadline missed",
|
||
fmt_us(dec.p99),
|
||
fmt_us(frame_period),
|
||
);
|
||
}
|
||
|
||
// ── Tests ───────────────────────────────────────────────────────────────────
|
||
|
||
#[test]
|
||
fn latency_headset_speech_320() {
|
||
// 320 samples @ 16 kHz = 20 ms frame — standard voice-chat period.
|
||
let path = corpus("ES2002a.Headset-0.wav");
|
||
require!(path);
|
||
let samples = load_mono(&path).expect("load");
|
||
// Cap to ~60 s of audio so the test doesn't dominate CI time.
|
||
let cap = (16_000 * 60).min(samples.len());
|
||
run_latency("headset_speech", &samples[..cap], 320, 16_000);
|
||
}
|
||
|
||
#[test]
|
||
fn latency_headset_speech_160() {
|
||
// 160 samples @ 16 kHz = 10 ms frame — tighter latency mode used by
|
||
// WebRTC and similar real-time systems.
|
||
let path = corpus("ES2002a.Headset-0.wav");
|
||
require!(path);
|
||
let samples = load_mono(&path).expect("load");
|
||
let cap = (16_000 * 60).min(samples.len());
|
||
run_latency("headset_speech_10ms", &samples[..cap], 160, 16_000);
|
||
}
|
||
|
||
#[test]
|
||
fn latency_headset_speech_480() {
|
||
// 480 samples at 16 kHz = 30 ms frame. The same sample count at
|
||
// 48 kHz is WebRTC's 10 ms full-band frame; since the codec only
|
||
// cares about frame sample count (not sample rate) this exercises
|
||
// the same search-grid shape that a 48 kHz WebRTC deployment would
|
||
// hit. 480 = 2^5 · 3 · 5, so partition orders 0..=5 are valid; 6
|
||
// and 7 are not, which differs from the 2048-sample dense case.
|
||
let path = corpus("ES2002a.Headset-0.wav");
|
||
require!(path);
|
||
let samples = load_mono(&path).expect("load");
|
||
let cap = (16_000 * 60).min(samples.len());
|
||
run_latency("headset_speech_480", &samples[..cap], 480, 16_000);
|
||
}
|
||
|
||
#[test]
|
||
fn latency_headset_speech_prime() {
|
||
// 503 is prime, so only `partition_order = 0` divides it — the
|
||
// encoder skips the partition search entirely and emits a single
|
||
// Rice partition. Covers a code path that power-of-two and
|
||
// smooth-composite frame sizes never reach.
|
||
let path = corpus("ES2002a.Headset-0.wav");
|
||
require!(path);
|
||
let samples = load_mono(&path).expect("load");
|
||
let cap = (16_000 * 60).min(samples.len());
|
||
run_latency("headset_speech_prime503", &samples[..cap], 503, 16_000);
|
||
}
|
||
|
||
#[test]
|
||
fn latency_mixed_meeting_320() {
|
||
let path = corpus("ES2002a.Mix-Headset.wav");
|
||
require!(path);
|
||
let samples = load_mono(&path).expect("load");
|
||
let cap = (16_000 * 60).min(samples.len());
|
||
run_latency("mixed_meeting", &samples[..cap], 320, 16_000);
|
||
}
|
||
|
||
#[test]
|
||
fn latency_array_speech_320() {
|
||
// Distant mic — residuals are noisier, so encode cost per frame
|
||
// typically rises. Useful to confirm P99 doesn't blow up on less
|
||
// predictable content.
|
||
let path = corpus("ES2002a.Array1-01.wav");
|
||
require!(path);
|
||
let samples = load_mono(&path).expect("load");
|
||
let cap = (16_000 * 60).min(samples.len());
|
||
run_latency("array_speech", &samples[..cap], 320, 16_000);
|
||
}
|