fix(client): keep recording and uplink sync responsive

This commit is contained in:
Brad Stein 2026-04-30 00:26:49 -03:00
parent 9401f2b7cd
commit a384ed2b7b
13 changed files with 347 additions and 66 deletions

6
Cargo.lock generated
View File

@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.15.3" version = "0.15.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1676,7 +1676,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.15.3" version = "0.15.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1688,7 +1688,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.15.3" version = "0.15.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.15.3" version = "0.15.4"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -203,7 +203,7 @@ impl LesavkaClientApp {
const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig { crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 32, capacity: 32,
max_age: Duration::from_secs(1), max_age: Duration::from_millis(350),
}; };
#[cfg(not(coverage))] #[cfg(not(coverage))]

View File

@ -1,5 +1,5 @@
pub fn quality_probe_command() -> &'static str { pub fn quality_probe_command() -> &'static str {
"scripts/ci/hygiene_gate.sh && scripts/ci/quality_gate.sh" "scripts/ci/platform_quality_gate.sh"
} }
fn capture_profile_label(capture: &CaptureSizeChoice, stream_caps_label: &str) -> String { fn capture_profile_label(capture: &CaptureSizeChoice, stream_caps_label: &str) -> String {

View File

@ -201,7 +201,7 @@ fn snapshot_json_is_serializable_and_mentions_probe_command() {
quality_probe_command().to_string(), quality_probe_command().to_string(),
); );
let json = report.to_pretty_json().expect("serialize"); let json = report.to_pretty_json().expect("serialize");
assert!(json.contains("quality_gate.sh")); assert!(json.contains("platform_quality_gate.sh"));
assert!(json.contains("routing")); assert!(json.contains("routing"));
assert!(json.contains("view_mode")); assert!(json.contains("view_mode"));
} }
@ -349,10 +349,9 @@ fn snapshot_report_uses_effective_mirrored_capture_profile() {
} }
#[test] #[test]
fn quality_probe_command_mentions_both_gates() { fn quality_probe_command_runs_full_platform_gate() {
let cmd = quality_probe_command(); let cmd = quality_probe_command();
assert!(cmd.contains("hygiene_gate.sh")); assert_eq!(cmd, "scripts/ci/platform_quality_gate.sh");
assert!(cmd.contains("quality_gate.sh"));
} }
#[test] #[test]

View File

@ -6,11 +6,17 @@
save_dir_override: Option<PathBuf>, save_dir_override: Option<PathBuf>,
timer: Option<glib::SourceId>, timer: Option<glib::SourceId>,
frame_dir: Option<PathBuf>, frame_dir: Option<PathBuf>,
output_path: Option<PathBuf>, frame_writer_tx: Option<std::sync::mpsc::Sender<RecordFrameTask>>,
finalize_rx: Option<std::sync::mpsc::Receiver<Result<PathBuf, String>>>,
next_frame_index: u32, next_frame_index: u32,
captured_frames: u32, }
encode_fps: u32,
encode_bitrate_kbit: u32, enum RecordFrameTask {
Frame {
texture: gtk::gdk::Texture,
frame_path: PathBuf,
},
Finish,
} }
fn eye_slug(title: &str) -> &'static str { fn eye_slug(title: &str) -> &'static str {
@ -22,6 +28,11 @@
} }
fn timestamp_slug() -> String { fn timestamp_slug() -> String {
if let Ok(now) = glib::DateTime::now_local()
&& let Ok(stamp) = now.format("%Y%m%d-%H%M%S")
{
return stamp.to_string();
}
let now = SystemTime::now() let now = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap_or_default(); .unwrap_or_default();
@ -46,18 +57,18 @@
if let Some(raw) = std::env::var_os("XDG_PICTURES_DIR") { if let Some(raw) = std::env::var_os("XDG_PICTURES_DIR") {
let path = expand_home_token(&raw.to_string_lossy()); let path = expand_home_token(&raw.to_string_lossy());
if !path.as_os_str().is_empty() { if !path.as_os_str().is_empty() {
return path.join("Lesavka"); return path.join("lesavka");
} }
} }
if let Some(home) = std::env::var_os("HOME") { if let Some(home) = std::env::var_os("HOME") {
return PathBuf::from(home).join("Pictures").join("Lesavka"); return PathBuf::from(home).join("Pictures").join("lesavka");
} }
if let Some(profile) = std::env::var_os("USERPROFILE") { if let Some(profile) = std::env::var_os("USERPROFILE") {
return PathBuf::from(profile).join("Pictures").join("Lesavka"); return PathBuf::from(profile).join("Pictures").join("lesavka");
} }
std::env::current_dir() std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from(".")) .unwrap_or_else(|_| PathBuf::from("."))
.join("Lesavka") .join("lesavka")
} }
fn ensure_eye_capture_root(override_dir: Option<&PathBuf>) -> Result<PathBuf, String> { fn ensure_eye_capture_root(override_dir: Option<&PathBuf>) -> Result<PathBuf, String> {
@ -127,44 +138,37 @@
(fps, bitrate_kbit) (fps, bitrate_kbit)
} }
fn write_record_frame(state: &mut EyeRecordState, picture: &gtk::Picture) -> Result<(), String> { fn queue_record_frame(state: &mut EyeRecordState, picture: &gtk::Picture) -> Result<(), String> {
let frame_dir = state let frame_dir = state
.frame_dir .frame_dir
.as_ref() .as_ref()
.ok_or_else(|| "recording session is not initialized".to_string())? .ok_or_else(|| "recording session is not initialized".to_string())?
.clone(); .clone();
let frame_writer_tx = state
.frame_writer_tx
.as_ref()
.ok_or_else(|| "recording worker is not initialized".to_string())?
.clone();
let texture = current_eye_texture(picture)?; let texture = current_eye_texture(picture)?;
let frame_path = frame_dir.join(format!("frame-{:06}.png", state.next_frame_index)); let frame_path = frame_dir.join(format!("frame-{:06}.png", state.next_frame_index));
save_texture_png(&texture, &frame_path)?; frame_writer_tx
.send(RecordFrameTask::Frame {
texture,
frame_path,
})
.map_err(|_| "recording worker stopped unexpectedly".to_string())?;
state.next_frame_index = state.next_frame_index.saturating_add(1); state.next_frame_index = state.next_frame_index.saturating_add(1);
state.captured_frames = state.captured_frames.saturating_add(1);
Ok(()) Ok(())
} }
fn finalize_recording(state: &mut EyeRecordState) -> Result<PathBuf, String> { fn encode_recording(
let frame_dir = state frame_dir: &PathBuf,
.frame_dir output_path: &PathBuf,
.take() encode_fps: u32,
.ok_or_else(|| "recording frames were not initialized".to_string())?; encode_bitrate_kbit: u32,
let output_path = state ) -> Result<(), String> {
.output_path
.take()
.ok_or_else(|| "recording output path was not initialized".to_string())?;
let captured_frames = state.captured_frames;
let encode_fps = state.encode_fps.max(1);
let encode_bitrate_kbit = state.encode_bitrate_kbit.max(800);
state.captured_frames = 0;
state.next_frame_index = 0;
state.encode_fps = 0;
state.encode_bitrate_kbit = 0;
if captured_frames < 2 {
let _ = std::fs::remove_dir_all(&frame_dir);
return Err("need at least two captured frames to build a recording".to_string());
}
let frame_pattern = frame_dir.join("frame-%06d.png"); let frame_pattern = frame_dir.join("frame-%06d.png");
let bitrate_arg = format!("{encode_bitrate_kbit}k"); let bitrate_arg = format!("{}k", encode_bitrate_kbit.max(800));
let encode = Command::new("ffmpeg") let encode = Command::new("ffmpeg")
.args([ .args([
"-hide_banner", "-hide_banner",
@ -172,7 +176,7 @@
"error", "error",
"-y", "-y",
"-framerate", "-framerate",
&encode_fps.to_string(), &encode_fps.max(1).to_string(),
"-i", "-i",
&frame_pattern.to_string_lossy(), &frame_pattern.to_string_lossy(),
"-c:v", "-c:v",
@ -180,7 +184,7 @@
"-pix_fmt", "-pix_fmt",
"yuv420p", "yuv420p",
"-r", "-r",
&encode_fps.to_string(), &encode_fps.max(1).to_string(),
"-b:v", "-b:v",
&bitrate_arg, &bitrate_arg,
&output_path.to_string_lossy(), &output_path.to_string_lossy(),
@ -195,7 +199,36 @@
frame_dir.display() frame_dir.display()
)); ));
} }
Ok(())
}
fn run_recording_worker(
frame_rx: std::sync::mpsc::Receiver<RecordFrameTask>,
frame_dir: PathBuf,
output_path: PathBuf,
encode_fps: u32,
encode_bitrate_kbit: u32,
) -> Result<PathBuf, String> {
let mut captured_frames = 0_u32;
loop {
match frame_rx.recv() {
Ok(RecordFrameTask::Frame {
texture,
frame_path,
}) => {
save_texture_png(&texture, &frame_path)?;
captured_frames = captured_frames.saturating_add(1);
}
Ok(RecordFrameTask::Finish) | Err(_) => break,
}
}
if captured_frames < 2 {
let _ = std::fs::remove_dir_all(&frame_dir);
return Err("need at least two captured frames to build a recording".to_string());
}
encode_recording(&frame_dir, &output_path, encode_fps, encode_bitrate_kbit)?;
let _ = std::fs::remove_dir_all(&frame_dir); let _ = std::fs::remove_dir_all(&frame_dir);
Ok(output_path) Ok(output_path)
} }
@ -344,30 +377,65 @@
let record_button = pane.record_button.clone(); let record_button = pane.record_button.clone();
record_button.connect_clicked(move |button| { record_button.connect_clicked(move |button| {
if save_state.borrow().timer.is_some() { if save_state.borrow().timer.is_some() {
let mut state = save_state.borrow_mut(); let finalize_rx = {
if let Some(timer) = state.timer.take() { let mut state = save_state.borrow_mut();
timer.remove(); if let Some(timer) = state.timer.take() {
} timer.remove();
drop(state); }
if let Some(frame_writer_tx) = state.frame_writer_tx.take() {
let _ = frame_writer_tx.send(RecordFrameTask::Finish);
}
state.next_frame_index = 0;
state.frame_dir = None;
state.finalize_rx.take()
};
let Some(finalize_rx) = finalize_rx else {
button.set_label("Record");
widgets.status_label.set_text(&format!(
"{} recording stop failed: recording worker state was missing.",
pane.title
));
return;
};
let mut state = save_state.borrow_mut(); button.set_sensitive(false);
match finalize_recording(&mut state) { button.set_label("Finishing...");
Ok(output) => { let button = button.clone();
let widgets = widgets.clone();
let pane_title = pane.title.clone();
glib::timeout_add_local(Duration::from_millis(100), move || match finalize_rx
.try_recv()
{
Ok(Ok(output)) => {
button.set_sensitive(true);
button.set_label("Record"); button.set_label("Record");
widgets.status_label.set_text(&format!( widgets.status_label.set_text(&format!(
"{} recording saved to {}.", "{} recording saved to {}.",
pane.title, pane_title,
output.display() output.display()
)); ));
glib::ControlFlow::Break
} }
Err(err) => { Ok(Err(err)) => {
button.set_sensitive(true);
button.set_label("Record"); button.set_label("Record");
widgets.status_label.set_text(&format!( widgets.status_label.set_text(&format!(
"{} recording stop failed: {err}", "{} recording stop failed: {err}",
pane.title, pane_title,
)); ));
glib::ControlFlow::Break
} }
} Err(std::sync::mpsc::TryRecvError::Empty) => glib::ControlFlow::Continue,
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
button.set_sensitive(true);
button.set_label("Record");
widgets.status_label.set_text(&format!(
"{} recording stop failed: recording worker disconnected.",
pane_title
));
glib::ControlFlow::Break
}
});
return; return;
} }
@ -400,14 +468,27 @@
return; return;
} }
let (frame_tx, frame_rx) = std::sync::mpsc::channel::<RecordFrameTask>();
let (result_tx, result_rx) = std::sync::mpsc::channel::<Result<PathBuf, String>>();
let frame_dir_worker = frame_dir.clone();
let output_path_worker = output_path.clone();
std::thread::spawn(move || {
let result = run_recording_worker(
frame_rx,
frame_dir_worker,
output_path_worker,
record_fps,
record_bitrate_kbit,
);
let _ = result_tx.send(result);
});
{ {
let mut state = save_state.borrow_mut(); let mut state = save_state.borrow_mut();
state.frame_dir = Some(frame_dir); state.frame_dir = Some(frame_dir);
state.output_path = Some(output_path.clone()); state.frame_writer_tx = Some(frame_tx);
state.finalize_rx = Some(result_rx);
state.next_frame_index = 0; state.next_frame_index = 0;
state.captured_frames = 0;
state.encode_fps = record_fps;
state.encode_bitrate_kbit = record_bitrate_kbit;
} }
let pane_for_tick = pane.clone(); let pane_for_tick = pane.clone();
@ -420,16 +501,21 @@
if state.frame_dir.is_none() { if state.frame_dir.is_none() {
return glib::ControlFlow::Break; return glib::ControlFlow::Break;
} }
if let Err(err) = write_record_frame(&mut state, &pane_for_tick.picture) { if let Err(err) = queue_record_frame(&mut state, &pane_for_tick.picture) {
if let Some(frame_writer_tx) = state.frame_writer_tx.take() {
let _ = frame_writer_tx.send(RecordFrameTask::Finish);
}
widgets_for_tick.status_label.set_text(&format!( widgets_for_tick.status_label.set_text(&format!(
"{} recording frame skipped: {err}", "{} recording frame skipped: {err}",
pane_for_tick.title pane_for_tick.title
)); ));
return glib::ControlFlow::Break;
} }
glib::ControlFlow::Continue glib::ControlFlow::Continue
}, },
); );
save_state.borrow_mut().timer = Some(timer); save_state.borrow_mut().timer = Some(timer);
button.set_sensitive(true);
button.set_label("Stop"); button.set_label("Stop");
widgets.status_label.set_text(&format!( widgets.status_label.set_text(&format!(
"Recording {} at {} fps (~{} kbit)... press Stop to finish.", "Recording {} at {} fps (~{} kbit)... press Stop to finish.",

View File

@ -44,7 +44,7 @@ pub use runtime::SyncProbeCapture;
#[cfg(any(not(coverage), test))] #[cfg(any(not(coverage), test))]
const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig { const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig {
capacity: 32, capacity: 32,
max_age: Duration::from_secs(1), max_age: Duration::from_millis(350),
}; };
#[cfg(any(not(coverage), test))] #[cfg(any(not(coverage), test))]

View File

@ -26,6 +26,9 @@ pub struct UplinkHarnessConfig {
pub consume_interval: Duration, pub consume_interval: Duration,
/// Number of packets admitted to the async queue before backpressure kicks in. /// Number of packets admitted to the async queue before backpressure kicks in.
pub queue_capacity: usize, pub queue_capacity: usize,
/// Optional maximum packet age before delivery. Older queued packets are
/// discarded to model freshness-first live media delivery.
pub freshness_max_age: Option<Duration>,
/// Total packets the synthetic capture source will attempt to produce. /// Total packets the synthetic capture source will attempt to produce.
pub total_packets: usize, pub total_packets: usize,
/// Optional one-shot downstream stall start time. /// Optional one-shot downstream stall start time.
@ -95,6 +98,15 @@ pub fn run_uplink_harness(
let now = event_time; let now = event_time;
if consume_ready == Some(now) { if consume_ready == Some(now) {
if let Some(max_age) = config.freshness_max_age {
while let Some(packet) = queue.front() {
if now.saturating_sub(packet.captured_at) <= max_age {
break;
}
let _ = queue.pop_front();
result.dropped_packets += 1;
}
}
if let Some(packet) = queue.pop_front() { if let Some(packet) = queue.pop_front() {
result.delivered_packets += 1; result.delivered_packets += 1;
result.max_delivery_age = result.max_delivery_age.max(now - packet.captured_at); result.max_delivery_age = result.max_delivery_age.max(now - packet.captured_at);
@ -182,6 +194,7 @@ mod tests {
capture_interval: Duration::from_millis(33), capture_interval: Duration::from_millis(33),
consume_interval: Duration::from_millis(33), consume_interval: Duration::from_millis(33),
queue_capacity: 8, queue_capacity: 8,
freshness_max_age: None,
total_packets: 160, total_packets: 160,
stall_after: Some(Duration::from_millis(800)), stall_after: Some(Duration::from_millis(800)),
stall_duration: Duration::from_secs(2), stall_duration: Duration::from_secs(2),
@ -193,6 +206,7 @@ mod tests {
capture_interval: Duration::from_millis(20), capture_interval: Duration::from_millis(20),
consume_interval: Duration::from_millis(20), consume_interval: Duration::from_millis(20),
queue_capacity: 16, queue_capacity: 16,
freshness_max_age: None,
total_packets: 320, total_packets: 320,
stall_after: Some(Duration::from_millis(600)), stall_after: Some(Duration::from_millis(600)),
stall_duration: Duration::from_secs(2), stall_duration: Duration::from_secs(2),

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.15.3" version = "0.15.4"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -29,6 +29,8 @@ MEDIA_TESTS=(
--test client_launcher_runtime_contract --test client_launcher_runtime_contract
--test client_microphone_include_contract --test client_microphone_include_contract
--test client_microphone_source_contract --test client_microphone_source_contract
--test client_uplink_freshness_contract
--test client_uplink_performance_contract
--test client_output_video_include_contract --test client_output_video_include_contract
--test handshake_camera_contract --test handshake_camera_contract
--test server_camera_contract --test server_camera_contract
@ -127,6 +129,7 @@ lines = [
'deterministic coverage', 'deterministic coverage',
'- bounded appsrc/appsink queue contracts', '- bounded appsrc/appsink queue contracts',
'- stale-frame/drop-over-latency contracts', '- stale-frame/drop-over-latency contracts',
'- A/V uplink freshness budget contracts',
'- local monotonic timestamp contracts', '- local monotonic timestamp contracts',
'- IDR/keyframe recovery contracts', '- IDR/keyframe recovery contracts',
'- HDMI/UVC sink construction contracts', '- HDMI/UVC sink construction contracts',

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.15.3" version = "0.15.4"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -0,0 +1,96 @@
//! Contract guardrails for uplink queue freshness budgets.
//!
//! Scope: source-level checks over client uplink queue constants.
//! Targets: `client/src/app/uplink_media.rs`, `client/src/sync_probe/capture.rs`.
//! Why: lip-sync quality depends on bounded queue age; accidental widening can
//! create near-second video lag under load.
const UPLINK_MEDIA_SRC: &str = include_str!("../../client/src/app/uplink_media.rs");
const SYNC_PROBE_CAPTURE_SRC: &str = include_str!("../../client/src/sync_probe/capture.rs");
fn queue_block<'a>(src: &'a str, queue_const: &str) -> &'a str {
let marker = format!("const {queue_const}:");
let start = src
.find(&marker)
.unwrap_or_else(|| panic!("missing queue constant marker: {marker}"));
let tail = &src[start..];
let end = tail
.find("};")
.unwrap_or_else(|| panic!("missing queue terminator for {queue_const}"));
&tail[..end]
}
fn parse_queue_capacity(block: &str, queue_const: &str) -> u64 {
let marker = "capacity:";
let start = block
.find(marker)
.unwrap_or_else(|| panic!("missing capacity for {queue_const}"));
let tail = &block[start + marker.len()..];
let value = tail.trim_start().split(',').next().unwrap_or("").trim();
value
.parse::<u64>()
.unwrap_or_else(|_| panic!("invalid capacity value for {queue_const}: {value}"))
}
fn parse_queue_max_age_ms(block: &str, queue_const: &str) -> u64 {
let millis_marker = "max_age: Duration::from_millis(";
if let Some(start) = block.find(millis_marker) {
let tail = &block[start + millis_marker.len()..];
let value = tail.split(')').next().unwrap_or("").trim();
return value
.parse::<u64>()
.unwrap_or_else(|_| panic!("invalid millis max_age for {queue_const}: {value}"));
}
let secs_marker = "max_age: Duration::from_secs(";
if let Some(start) = block.find(secs_marker) {
let tail = &block[start + secs_marker.len()..];
let value = tail.split(')').next().unwrap_or("").trim();
let seconds = value
.parse::<u64>()
.unwrap_or_else(|_| panic!("invalid seconds max_age for {queue_const}: {value}"));
return seconds.saturating_mul(1_000);
}
panic!("missing max_age for {queue_const}");
}
#[test]
fn camera_uplink_queue_freshness_budget_stays_within_lipsync_window() {
let block = queue_block(UPLINK_MEDIA_SRC, "VIDEO_UPLINK_QUEUE");
let max_age_ms = parse_queue_max_age_ms(block, "VIDEO_UPLINK_QUEUE");
assert!(
max_age_ms <= 350,
"VIDEO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 350ms to prevent ~1s video drift"
);
}
#[test]
fn microphone_uplink_queue_freshness_budget_stays_within_live_audio_window() {
let block = queue_block(UPLINK_MEDIA_SRC, "AUDIO_UPLINK_QUEUE");
let max_age_ms = parse_queue_max_age_ms(block, "AUDIO_UPLINK_QUEUE");
assert!(
max_age_ms <= 400,
"AUDIO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 400ms for live calls"
);
}
#[test]
fn camera_uplink_queue_capacity_remains_bounded() {
let block = queue_block(UPLINK_MEDIA_SRC, "VIDEO_UPLINK_QUEUE");
let capacity = parse_queue_capacity(block, "VIDEO_UPLINK_QUEUE");
assert!(
capacity <= 32,
"VIDEO_UPLINK_QUEUE capacity is {capacity}; larger queues amplify tail-latency under stalls"
);
}
#[test]
fn sync_probe_video_queue_uses_same_freshness_budget() {
let block = queue_block(SYNC_PROBE_CAPTURE_SRC, "PROBE_VIDEO_QUEUE");
let max_age_ms = parse_queue_max_age_ms(block, "PROBE_VIDEO_QUEUE");
assert!(
max_age_ms <= 350,
"PROBE_VIDEO_QUEUE max_age is {max_age_ms}ms; keep probe and runtime freshness policies aligned"
);
}

View File

@ -0,0 +1,83 @@
//! Synthetic performance contract for the upstream media queues.
//!
//! Scope: deterministic backpressure simulation without physical devices.
//! Targets: `client/src/uplink_latency_harness.rs`.
//! Why: A/V sync depends on dropping stale media under stalls instead of
//! preserving a growing delay buffer.
#[path = "../../client/src/uplink_latency_harness.rs"]
#[allow(warnings)]
mod uplink_latency_harness;
use std::time::Duration;
use uplink_latency_harness::{UplinkHarnessConfig, UplinkQueuePolicy, run_uplink_harness};
fn camera_stall_config() -> UplinkHarnessConfig {
UplinkHarnessConfig {
capture_interval: Duration::from_millis(33),
consume_interval: Duration::from_millis(33),
queue_capacity: 32,
freshness_max_age: Some(Duration::from_millis(350)),
total_packets: 240,
stall_after: Some(Duration::from_millis(800)),
stall_duration: Duration::from_secs(2),
}
}
fn microphone_stall_config() -> UplinkHarnessConfig {
UplinkHarnessConfig {
capture_interval: Duration::from_millis(20),
consume_interval: Duration::from_millis(20),
queue_capacity: 16,
freshness_max_age: Some(Duration::from_millis(400)),
total_packets: 320,
stall_after: Some(Duration::from_millis(600)),
stall_duration: Duration::from_secs(2),
}
}
#[test]
fn freshness_first_camera_policy_keeps_video_delivery_age_bounded_under_stall() {
let result = run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::DropOldestWhenFull);
assert!(
result.dropped_packets > 0,
"camera stall should drop stale frames instead of preserving backlog"
);
assert!(
result.max_delivery_age <= Duration::from_millis(350),
"camera delivery age must stay <=350ms under synthetic stall, got {:?}",
result.max_delivery_age
);
}
#[test]
fn freshness_first_microphone_policy_keeps_audio_delivery_age_bounded_under_stall() {
let result = run_uplink_harness(
microphone_stall_config(),
UplinkQueuePolicy::DropOldestWhenFull,
);
assert!(
result.dropped_packets > 0,
"microphone stall should drop stale chunks instead of preserving backlog"
);
assert!(
result.max_delivery_age <= Duration::from_millis(400),
"microphone delivery age must stay <=400ms under synthetic stall, got {:?}",
result.max_delivery_age
);
}
#[test]
fn preserve_backlog_policy_would_violate_the_lip_sync_budget() {
let mut config = camera_stall_config();
config.freshness_max_age = None;
let result = run_uplink_harness(config, UplinkQueuePolicy::PreserveBacklog);
assert!(
result.max_delivery_age >= Duration::from_secs(1),
"preserve-backlog policy should demonstrate the regression class, got {:?}",
result.max_delivery_age
);
}