diff --git a/Cargo.lock b/Cargo.lock index 3a5d97b..674e03d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.15.3" +version = "0.15.4" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.15.3" +version = "0.15.4" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.15.3" +version = "0.15.4" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 7e0521f..a7ee0aa 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.15.3" +version = "0.15.4" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index c42dbad..97a974b 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -203,7 +203,7 @@ impl LesavkaClientApp { const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 32, - max_age: Duration::from_secs(1), + max_age: Duration::from_millis(350), }; #[cfg(not(coverage))] diff --git a/client/src/launcher/diagnostics/recommendations.rs b/client/src/launcher/diagnostics/recommendations.rs index a93bb15..b195408 100644 --- a/client/src/launcher/diagnostics/recommendations.rs +++ b/client/src/launcher/diagnostics/recommendations.rs @@ -1,5 +1,5 @@ pub fn quality_probe_command() -> &'static str { - "scripts/ci/hygiene_gate.sh && scripts/ci/quality_gate.sh" + "scripts/ci/platform_quality_gate.sh" } fn capture_profile_label(capture: &CaptureSizeChoice, stream_caps_label: &str) -> String { diff --git a/client/src/launcher/tests/diagnostics.rs b/client/src/launcher/tests/diagnostics.rs index ddeada8..f4c944e 100644 --- a/client/src/launcher/tests/diagnostics.rs +++ b/client/src/launcher/tests/diagnostics.rs @@ -201,7 +201,7 @@ fn snapshot_json_is_serializable_and_mentions_probe_command() { quality_probe_command().to_string(), ); let json = report.to_pretty_json().expect("serialize"); - assert!(json.contains("quality_gate.sh")); + assert!(json.contains("platform_quality_gate.sh")); assert!(json.contains("routing")); assert!(json.contains("view_mode")); } @@ -349,10 +349,9 @@ fn snapshot_report_uses_effective_mirrored_capture_profile() { } #[test] -fn quality_probe_command_mentions_both_gates() { +fn quality_probe_command_runs_full_platform_gate() { let cmd = quality_probe_command(); - assert!(cmd.contains("hygiene_gate.sh")); - assert!(cmd.contains("quality_gate.sh")); + assert_eq!(cmd, "scripts/ci/platform_quality_gate.sh"); } #[test] diff --git a/client/src/launcher/ui/utility_button_bindings.rs b/client/src/launcher/ui/utility_button_bindings.rs index 8172008..a3b3ec7 100644 --- a/client/src/launcher/ui/utility_button_bindings.rs +++ b/client/src/launcher/ui/utility_button_bindings.rs @@ -6,11 +6,17 @@ save_dir_override: Option, timer: Option, frame_dir: Option, - output_path: Option, + frame_writer_tx: Option>, + finalize_rx: Option>>, next_frame_index: u32, - captured_frames: u32, - encode_fps: u32, - encode_bitrate_kbit: u32, + } + + enum RecordFrameTask { + Frame { + texture: gtk::gdk::Texture, + frame_path: PathBuf, + }, + Finish, } fn eye_slug(title: &str) -> &'static str { @@ -22,6 +28,11 @@ } fn timestamp_slug() -> String { + if let Ok(now) = glib::DateTime::now_local() + && let Ok(stamp) = now.format("%Y%m%d-%H%M%S") + { + return stamp.to_string(); + } let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default(); @@ -46,18 +57,18 @@ if let Some(raw) = std::env::var_os("XDG_PICTURES_DIR") { let path = expand_home_token(&raw.to_string_lossy()); if !path.as_os_str().is_empty() { - return path.join("Lesavka"); + return path.join("lesavka"); } } if let Some(home) = std::env::var_os("HOME") { - return PathBuf::from(home).join("Pictures").join("Lesavka"); + return PathBuf::from(home).join("Pictures").join("lesavka"); } if let Some(profile) = std::env::var_os("USERPROFILE") { - return PathBuf::from(profile).join("Pictures").join("Lesavka"); + return PathBuf::from(profile).join("Pictures").join("lesavka"); } std::env::current_dir() .unwrap_or_else(|_| PathBuf::from(".")) - .join("Lesavka") + .join("lesavka") } fn ensure_eye_capture_root(override_dir: Option<&PathBuf>) -> Result { @@ -127,44 +138,37 @@ (fps, bitrate_kbit) } - fn write_record_frame(state: &mut EyeRecordState, picture: >k::Picture) -> Result<(), String> { + fn queue_record_frame(state: &mut EyeRecordState, picture: >k::Picture) -> Result<(), String> { let frame_dir = state .frame_dir .as_ref() .ok_or_else(|| "recording session is not initialized".to_string())? .clone(); + let frame_writer_tx = state + .frame_writer_tx + .as_ref() + .ok_or_else(|| "recording worker is not initialized".to_string())? + .clone(); let texture = current_eye_texture(picture)?; let frame_path = frame_dir.join(format!("frame-{:06}.png", state.next_frame_index)); - save_texture_png(&texture, &frame_path)?; + frame_writer_tx + .send(RecordFrameTask::Frame { + texture, + frame_path, + }) + .map_err(|_| "recording worker stopped unexpectedly".to_string())?; state.next_frame_index = state.next_frame_index.saturating_add(1); - state.captured_frames = state.captured_frames.saturating_add(1); Ok(()) } - fn finalize_recording(state: &mut EyeRecordState) -> Result { - let frame_dir = state - .frame_dir - .take() - .ok_or_else(|| "recording frames were not initialized".to_string())?; - let output_path = state - .output_path - .take() - .ok_or_else(|| "recording output path was not initialized".to_string())?; - let captured_frames = state.captured_frames; - let encode_fps = state.encode_fps.max(1); - let encode_bitrate_kbit = state.encode_bitrate_kbit.max(800); - state.captured_frames = 0; - state.next_frame_index = 0; - state.encode_fps = 0; - state.encode_bitrate_kbit = 0; - - if captured_frames < 2 { - let _ = std::fs::remove_dir_all(&frame_dir); - return Err("need at least two captured frames to build a recording".to_string()); - } - + fn encode_recording( + frame_dir: &PathBuf, + output_path: &PathBuf, + encode_fps: u32, + encode_bitrate_kbit: u32, + ) -> Result<(), String> { let frame_pattern = frame_dir.join("frame-%06d.png"); - let bitrate_arg = format!("{encode_bitrate_kbit}k"); + let bitrate_arg = format!("{}k", encode_bitrate_kbit.max(800)); let encode = Command::new("ffmpeg") .args([ "-hide_banner", @@ -172,7 +176,7 @@ "error", "-y", "-framerate", - &encode_fps.to_string(), + &encode_fps.max(1).to_string(), "-i", &frame_pattern.to_string_lossy(), "-c:v", @@ -180,7 +184,7 @@ "-pix_fmt", "yuv420p", "-r", - &encode_fps.to_string(), + &encode_fps.max(1).to_string(), "-b:v", &bitrate_arg, &output_path.to_string_lossy(), @@ -195,7 +199,36 @@ frame_dir.display() )); } + Ok(()) + } + fn run_recording_worker( + frame_rx: std::sync::mpsc::Receiver, + frame_dir: PathBuf, + output_path: PathBuf, + encode_fps: u32, + encode_bitrate_kbit: u32, + ) -> Result { + let mut captured_frames = 0_u32; + loop { + match frame_rx.recv() { + Ok(RecordFrameTask::Frame { + texture, + frame_path, + }) => { + save_texture_png(&texture, &frame_path)?; + captured_frames = captured_frames.saturating_add(1); + } + Ok(RecordFrameTask::Finish) | Err(_) => break, + } + } + + if captured_frames < 2 { + let _ = std::fs::remove_dir_all(&frame_dir); + return Err("need at least two captured frames to build a recording".to_string()); + } + + encode_recording(&frame_dir, &output_path, encode_fps, encode_bitrate_kbit)?; let _ = std::fs::remove_dir_all(&frame_dir); Ok(output_path) } @@ -344,30 +377,65 @@ let record_button = pane.record_button.clone(); record_button.connect_clicked(move |button| { if save_state.borrow().timer.is_some() { - let mut state = save_state.borrow_mut(); - if let Some(timer) = state.timer.take() { - timer.remove(); - } - drop(state); + let finalize_rx = { + let mut state = save_state.borrow_mut(); + if let Some(timer) = state.timer.take() { + timer.remove(); + } + if let Some(frame_writer_tx) = state.frame_writer_tx.take() { + let _ = frame_writer_tx.send(RecordFrameTask::Finish); + } + state.next_frame_index = 0; + state.frame_dir = None; + state.finalize_rx.take() + }; + let Some(finalize_rx) = finalize_rx else { + button.set_label("Record"); + widgets.status_label.set_text(&format!( + "{} recording stop failed: recording worker state was missing.", + pane.title + )); + return; + }; - let mut state = save_state.borrow_mut(); - match finalize_recording(&mut state) { - Ok(output) => { + button.set_sensitive(false); + button.set_label("Finishing..."); + let button = button.clone(); + let widgets = widgets.clone(); + let pane_title = pane.title.clone(); + glib::timeout_add_local(Duration::from_millis(100), move || match finalize_rx + .try_recv() + { + Ok(Ok(output)) => { + button.set_sensitive(true); button.set_label("Record"); widgets.status_label.set_text(&format!( "{} recording saved to {}.", - pane.title, + pane_title, output.display() )); + glib::ControlFlow::Break } - Err(err) => { + Ok(Err(err)) => { + button.set_sensitive(true); button.set_label("Record"); widgets.status_label.set_text(&format!( "{} recording stop failed: {err}", - pane.title, + pane_title, )); + glib::ControlFlow::Break } - } + Err(std::sync::mpsc::TryRecvError::Empty) => glib::ControlFlow::Continue, + Err(std::sync::mpsc::TryRecvError::Disconnected) => { + button.set_sensitive(true); + button.set_label("Record"); + widgets.status_label.set_text(&format!( + "{} recording stop failed: recording worker disconnected.", + pane_title + )); + glib::ControlFlow::Break + } + }); return; } @@ -400,14 +468,27 @@ return; } + let (frame_tx, frame_rx) = std::sync::mpsc::channel::(); + let (result_tx, result_rx) = std::sync::mpsc::channel::>(); + let frame_dir_worker = frame_dir.clone(); + let output_path_worker = output_path.clone(); + std::thread::spawn(move || { + let result = run_recording_worker( + frame_rx, + frame_dir_worker, + output_path_worker, + record_fps, + record_bitrate_kbit, + ); + let _ = result_tx.send(result); + }); + { let mut state = save_state.borrow_mut(); state.frame_dir = Some(frame_dir); - state.output_path = Some(output_path.clone()); + state.frame_writer_tx = Some(frame_tx); + state.finalize_rx = Some(result_rx); state.next_frame_index = 0; - state.captured_frames = 0; - state.encode_fps = record_fps; - state.encode_bitrate_kbit = record_bitrate_kbit; } let pane_for_tick = pane.clone(); @@ -420,16 +501,21 @@ if state.frame_dir.is_none() { return glib::ControlFlow::Break; } - if let Err(err) = write_record_frame(&mut state, &pane_for_tick.picture) { + if let Err(err) = queue_record_frame(&mut state, &pane_for_tick.picture) { + if let Some(frame_writer_tx) = state.frame_writer_tx.take() { + let _ = frame_writer_tx.send(RecordFrameTask::Finish); + } widgets_for_tick.status_label.set_text(&format!( "{} recording frame skipped: {err}", pane_for_tick.title )); + return glib::ControlFlow::Break; } glib::ControlFlow::Continue }, ); save_state.borrow_mut().timer = Some(timer); + button.set_sensitive(true); button.set_label("Stop"); widgets.status_label.set_text(&format!( "Recording {} at {} fps (~{} kbit)... press Stop to finish.", diff --git a/client/src/sync_probe/capture.rs b/client/src/sync_probe/capture.rs index 0177bd2..fd909cf 100644 --- a/client/src/sync_probe/capture.rs +++ b/client/src/sync_probe/capture.rs @@ -44,7 +44,7 @@ pub use runtime::SyncProbeCapture; #[cfg(any(not(coverage), test))] const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig { capacity: 32, - max_age: Duration::from_secs(1), + max_age: Duration::from_millis(350), }; #[cfg(any(not(coverage), test))] diff --git a/client/src/uplink_latency_harness.rs b/client/src/uplink_latency_harness.rs index dd2d801..b164919 100644 --- a/client/src/uplink_latency_harness.rs +++ b/client/src/uplink_latency_harness.rs @@ -26,6 +26,9 @@ pub struct UplinkHarnessConfig { pub consume_interval: Duration, /// Number of packets admitted to the async queue before backpressure kicks in. pub queue_capacity: usize, + /// Optional maximum packet age before delivery. Older queued packets are + /// discarded to model freshness-first live media delivery. + pub freshness_max_age: Option, /// Total packets the synthetic capture source will attempt to produce. pub total_packets: usize, /// Optional one-shot downstream stall start time. @@ -95,6 +98,15 @@ pub fn run_uplink_harness( let now = event_time; if consume_ready == Some(now) { + if let Some(max_age) = config.freshness_max_age { + while let Some(packet) = queue.front() { + if now.saturating_sub(packet.captured_at) <= max_age { + break; + } + let _ = queue.pop_front(); + result.dropped_packets += 1; + } + } if let Some(packet) = queue.pop_front() { result.delivered_packets += 1; result.max_delivery_age = result.max_delivery_age.max(now - packet.captured_at); @@ -182,6 +194,7 @@ mod tests { capture_interval: Duration::from_millis(33), consume_interval: Duration::from_millis(33), queue_capacity: 8, + freshness_max_age: None, total_packets: 160, stall_after: Some(Duration::from_millis(800)), stall_duration: Duration::from_secs(2), @@ -193,6 +206,7 @@ mod tests { capture_interval: Duration::from_millis(20), consume_interval: Duration::from_millis(20), queue_capacity: 16, + freshness_max_age: None, total_packets: 320, stall_after: Some(Duration::from_millis(600)), stall_duration: Duration::from_secs(2), diff --git a/common/Cargo.toml b/common/Cargo.toml index 36c5bc6..749ff74 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.15.3" +version = "0.15.4" edition = "2024" build = "build.rs" diff --git a/scripts/ci/media_reliability_gate.sh b/scripts/ci/media_reliability_gate.sh index 8087d8c..e8022f1 100755 --- a/scripts/ci/media_reliability_gate.sh +++ b/scripts/ci/media_reliability_gate.sh @@ -29,6 +29,8 @@ MEDIA_TESTS=( --test client_launcher_runtime_contract --test client_microphone_include_contract --test client_microphone_source_contract + --test client_uplink_freshness_contract + --test client_uplink_performance_contract --test client_output_video_include_contract --test handshake_camera_contract --test server_camera_contract @@ -127,6 +129,7 @@ lines = [ 'deterministic coverage', '- bounded appsrc/appsink queue contracts', '- stale-frame/drop-over-latency contracts', + '- A/V uplink freshness budget contracts', '- local monotonic timestamp contracts', '- IDR/keyframe recovery contracts', '- HDMI/UVC sink construction contracts', diff --git a/server/Cargo.toml b/server/Cargo.toml index afec6b9..3d7e165 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.15.3" +version = "0.15.4" edition = "2024" autobins = false diff --git a/testing/tests/client_uplink_freshness_contract.rs b/testing/tests/client_uplink_freshness_contract.rs new file mode 100644 index 0000000..c48271f --- /dev/null +++ b/testing/tests/client_uplink_freshness_contract.rs @@ -0,0 +1,96 @@ +//! Contract guardrails for uplink queue freshness budgets. +//! +//! Scope: source-level checks over client uplink queue constants. +//! Targets: `client/src/app/uplink_media.rs`, `client/src/sync_probe/capture.rs`. +//! Why: lip-sync quality depends on bounded queue age; accidental widening can +//! create near-second video lag under load. + +const UPLINK_MEDIA_SRC: &str = include_str!("../../client/src/app/uplink_media.rs"); +const SYNC_PROBE_CAPTURE_SRC: &str = include_str!("../../client/src/sync_probe/capture.rs"); + +fn queue_block<'a>(src: &'a str, queue_const: &str) -> &'a str { + let marker = format!("const {queue_const}:"); + let start = src + .find(&marker) + .unwrap_or_else(|| panic!("missing queue constant marker: {marker}")); + let tail = &src[start..]; + let end = tail + .find("};") + .unwrap_or_else(|| panic!("missing queue terminator for {queue_const}")); + &tail[..end] +} + +fn parse_queue_capacity(block: &str, queue_const: &str) -> u64 { + let marker = "capacity:"; + let start = block + .find(marker) + .unwrap_or_else(|| panic!("missing capacity for {queue_const}")); + let tail = &block[start + marker.len()..]; + let value = tail.trim_start().split(',').next().unwrap_or("").trim(); + value + .parse::() + .unwrap_or_else(|_| panic!("invalid capacity value for {queue_const}: {value}")) +} + +fn parse_queue_max_age_ms(block: &str, queue_const: &str) -> u64 { + let millis_marker = "max_age: Duration::from_millis("; + if let Some(start) = block.find(millis_marker) { + let tail = &block[start + millis_marker.len()..]; + let value = tail.split(')').next().unwrap_or("").trim(); + return value + .parse::() + .unwrap_or_else(|_| panic!("invalid millis max_age for {queue_const}: {value}")); + } + + let secs_marker = "max_age: Duration::from_secs("; + if let Some(start) = block.find(secs_marker) { + let tail = &block[start + secs_marker.len()..]; + let value = tail.split(')').next().unwrap_or("").trim(); + let seconds = value + .parse::() + .unwrap_or_else(|_| panic!("invalid seconds max_age for {queue_const}: {value}")); + return seconds.saturating_mul(1_000); + } + + panic!("missing max_age for {queue_const}"); +} + +#[test] +fn camera_uplink_queue_freshness_budget_stays_within_lipsync_window() { + let block = queue_block(UPLINK_MEDIA_SRC, "VIDEO_UPLINK_QUEUE"); + let max_age_ms = parse_queue_max_age_ms(block, "VIDEO_UPLINK_QUEUE"); + assert!( + max_age_ms <= 350, + "VIDEO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 350ms to prevent ~1s video drift" + ); +} + +#[test] +fn microphone_uplink_queue_freshness_budget_stays_within_live_audio_window() { + let block = queue_block(UPLINK_MEDIA_SRC, "AUDIO_UPLINK_QUEUE"); + let max_age_ms = parse_queue_max_age_ms(block, "AUDIO_UPLINK_QUEUE"); + assert!( + max_age_ms <= 400, + "AUDIO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 400ms for live calls" + ); +} + +#[test] +fn camera_uplink_queue_capacity_remains_bounded() { + let block = queue_block(UPLINK_MEDIA_SRC, "VIDEO_UPLINK_QUEUE"); + let capacity = parse_queue_capacity(block, "VIDEO_UPLINK_QUEUE"); + assert!( + capacity <= 32, + "VIDEO_UPLINK_QUEUE capacity is {capacity}; larger queues amplify tail-latency under stalls" + ); +} + +#[test] +fn sync_probe_video_queue_uses_same_freshness_budget() { + let block = queue_block(SYNC_PROBE_CAPTURE_SRC, "PROBE_VIDEO_QUEUE"); + let max_age_ms = parse_queue_max_age_ms(block, "PROBE_VIDEO_QUEUE"); + assert!( + max_age_ms <= 350, + "PROBE_VIDEO_QUEUE max_age is {max_age_ms}ms; keep probe and runtime freshness policies aligned" + ); +} diff --git a/testing/tests/client_uplink_performance_contract.rs b/testing/tests/client_uplink_performance_contract.rs new file mode 100644 index 0000000..3fbd6b9 --- /dev/null +++ b/testing/tests/client_uplink_performance_contract.rs @@ -0,0 +1,83 @@ +//! Synthetic performance contract for the upstream media queues. +//! +//! Scope: deterministic backpressure simulation without physical devices. +//! Targets: `client/src/uplink_latency_harness.rs`. +//! Why: A/V sync depends on dropping stale media under stalls instead of +//! preserving a growing delay buffer. + +#[path = "../../client/src/uplink_latency_harness.rs"] +#[allow(warnings)] +mod uplink_latency_harness; + +use std::time::Duration; +use uplink_latency_harness::{UplinkHarnessConfig, UplinkQueuePolicy, run_uplink_harness}; + +fn camera_stall_config() -> UplinkHarnessConfig { + UplinkHarnessConfig { + capture_interval: Duration::from_millis(33), + consume_interval: Duration::from_millis(33), + queue_capacity: 32, + freshness_max_age: Some(Duration::from_millis(350)), + total_packets: 240, + stall_after: Some(Duration::from_millis(800)), + stall_duration: Duration::from_secs(2), + } +} + +fn microphone_stall_config() -> UplinkHarnessConfig { + UplinkHarnessConfig { + capture_interval: Duration::from_millis(20), + consume_interval: Duration::from_millis(20), + queue_capacity: 16, + freshness_max_age: Some(Duration::from_millis(400)), + total_packets: 320, + stall_after: Some(Duration::from_millis(600)), + stall_duration: Duration::from_secs(2), + } +} + +#[test] +fn freshness_first_camera_policy_keeps_video_delivery_age_bounded_under_stall() { + let result = run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::DropOldestWhenFull); + + assert!( + result.dropped_packets > 0, + "camera stall should drop stale frames instead of preserving backlog" + ); + assert!( + result.max_delivery_age <= Duration::from_millis(350), + "camera delivery age must stay <=350ms under synthetic stall, got {:?}", + result.max_delivery_age + ); +} + +#[test] +fn freshness_first_microphone_policy_keeps_audio_delivery_age_bounded_under_stall() { + let result = run_uplink_harness( + microphone_stall_config(), + UplinkQueuePolicy::DropOldestWhenFull, + ); + + assert!( + result.dropped_packets > 0, + "microphone stall should drop stale chunks instead of preserving backlog" + ); + assert!( + result.max_delivery_age <= Duration::from_millis(400), + "microphone delivery age must stay <=400ms under synthetic stall, got {:?}", + result.max_delivery_age + ); +} + +#[test] +fn preserve_backlog_policy_would_violate_the_lip_sync_budget() { + let mut config = camera_stall_config(); + config.freshness_max_age = None; + let result = run_uplink_harness(config, UplinkQueuePolicy::PreserveBacklog); + + assert!( + result.max_delivery_age >= Duration::from_secs(1), + "preserve-backlog policy should demonstrate the regression class, got {:?}", + result.max_delivery_age + ); +}