From 609517de0328706807182ba4177e6e5e04b6e9d8 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 2 May 2026 10:31:22 -0300 Subject: [PATCH] media: clamp future capture pts and live-switch devices --- AGENTS.md | 4 +- Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/app/downlink_media.rs | 31 ++- client/src/app/session_lifecycle.rs | 95 +++------- client/src/app/uplink_media.rs | 178 +++++++++++++++++- client/src/input/camera/capture_pipeline.rs | 29 ++- client/src/input/microphone.rs | 25 ++- client/src/launcher/tests/ui_runtime.rs | 10 + client/src/launcher/ui/control_requests.rs | 25 +++ .../src/launcher/ui/media_device_bindings.rs | 14 +- .../src/launcher/ui/stage_device_bindings.rs | 14 +- .../src/launcher/ui_runtime/control_paths.rs | 6 +- client/src/live_capture_clock.rs | 95 +++++++++- client/src/live_media_control.rs | 142 +++++++++++++- client/src/output/audio.rs | 33 +++- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- .../tests/client_camera_include_contract.rs | 1 + .../tests/support/live_capture_clock_shim.rs | 38 ++++ 20 files changed, 632 insertions(+), 120 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index f6f2d4d..956ee71 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -154,7 +154,7 @@ Context: the mirrored browser probe finally reproduced the real failure class on - [ ] Surface `Starting`, `Healing`, `Flowing`, `Lagging`, `Dropping`, and `Stale` states in chips/diagnostics from real path evidence. ### Phase 4: Recovery And Mid-Session Changes -- [ ] Make device changes trigger soft-pause, stream replacement, queue flush, and re-pairing. +- [x] Make device changes trigger soft-pause, stream replacement, queue flush, and re-pairing. - [ ] Keep recovery soft-first; reserve hard UVC/UAC gadget rebuilds for explicit guarded recoveries. - [ ] Add cooldown/state guards so recovery buttons cannot wedge Theia. - [ ] Ensure disconnect closes all client/server media tasks for the session. @@ -229,3 +229,5 @@ Context: 0.16.x proved that queue tweaks and static calibration cannot guarantee - 2026-05-02: 0.17.3 Google Meet manual test improved to roughly sub-second/near-quarter-second lip sync, but the mirrored analyzer could not pair pulses and the user still heard choppy background audio. Client logs showed Pulse microphone packets arriving unevenly with ages around `90-240ms`; patch 0.17.4 lowers Pulse mic `buffer-time`/`latency-time`, bounds the mic queue/appsink, and keeps mirrored-probe after-run planner diagnostics even when analysis fails. - 2026-05-02: 0.17.4 mirrored run was salvageable after an SCP banner timeout, but analysis still failed with no close pulse pairs. The client log still showed `180-240ms` microphone delivery ages, pointing at server playout sleeps backpressuring the gRPC microphone stream. Patch 0.17.5 drains inbound microphone packets while waiting for scheduled UAC playout and retries browser-capture SCP fetches. - 2026-05-02: 0.17.5 mirrored run still failed with insufficient paired evidence, and the client log still showed recurring `180-240ms` microphone packet age while camera age stayed near zero. Patch 0.17.6 splits oversized mic samples into `20ms` timestamped packets and keeps a short fresh server-side audio window instead of collapsing every pending burst to one newest chunk, aiming to preserve lip sync without making background audio choppy. +- 2026-05-02: 0.17.6 Bumblebee mirrored run proved Bumblebee mic packets are already `10ms`, but camera source timestamps were being rebased up to roughly `1.8s` into the future while mic packets sat around `180-240ms` old. Patch 0.17.7 adds a source lead cap (`80ms` default) to both direct and duration-paced client timestamp rebasing so bursty camera buffers cannot make the server wait for fake future video while fresh audio keeps moving. +- 2026-05-02: The launcher UI was still writing live control files with only camera/mic/speaker booleans, so media device combo changes were honestly only staged for the next child launch. Patch 0.17.7 extends the live media control file with base64-encoded camera source, camera profile, microphone source, and speaker sink choices; the relay child now rebuilds the affected camera, mic, or speaker pipeline when those selections change. diff --git a/Cargo.lock b/Cargo.lock index 9abfa9e..2c1b573 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.6" +version = "0.17.7" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.6" +version = "0.17.7" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.6" +version = "0.17.7" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 083e239..c3b6c69 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.6" +version = "0.17.7" edition = "2024" [dependencies] diff --git a/client/src/app/downlink_media.rs b/client/src/app/downlink_media.rs index 226a9ce..4696f75 100644 --- a/client/src/app/downlink_media.rs +++ b/client/src/app/downlink_media.rs @@ -70,7 +70,6 @@ impl LesavkaClientApp { #[cfg(not(coverage))] async fn audio_loop( ep: Channel, - out: AudioOut, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut consecutive_source_failures = 0_u32; @@ -92,6 +91,24 @@ impl LesavkaClientApp { paused = false; delay = Duration::from_secs(1); } + let audio_sink_choice = media_controls.refresh().audio_sink; + let active_sink = audio_sink_choice.resolve(None); + let out = match audio_sink_choice { + crate::live_media_control::MediaDeviceChoice::Auto => AudioOut::new_default_sink(), + crate::live_media_control::MediaDeviceChoice::Inherit => AudioOut::new(), + crate::live_media_control::MediaDeviceChoice::Selected(ref sink) => { + AudioOut::new_with_sink(Some(sink)) + } + }; + let out = match out { + Ok(out) => out, + Err(err) => { + audio_failure_log.record("sink", &err.to_string()); + delay = app_support::next_delay(delay); + tokio::time::sleep(delay).await; + continue; + } + }; let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: 0, @@ -108,10 +125,20 @@ impl LesavkaClientApp { let mut warned_no_packets = false; delay = Duration::from_secs(1); loop { - if !media_controls.refresh().audio { + let state = media_controls.refresh(); + if !state.audio { tracing::info!("🔇 remote audio soft-paused; closing capture stream"); break; } + let desired_sink = state.audio_sink.resolve(None); + if desired_sink != active_sink { + tracing::info!( + from = active_sink.as_deref().unwrap_or("auto"), + to = desired_sink.as_deref().unwrap_or("auto"), + "🔊 speaker sink changed; restarting live audio output pipeline" + ); + break; + } match tokio::time::timeout( Duration::from_secs(1), stream.get_mut().message(), diff --git a/client/src/app/session_lifecycle.rs b/client/src/app/session_lifecycle.rs index e803bfc..15f02be 100644 --- a/client/src/app/session_lifecycle.rs +++ b/client/src/app/session_lifecycle.rs @@ -58,11 +58,19 @@ impl LesavkaClientApp { let caps = handshake::negotiate(&self.server_addr).await; tracing::info!("🤝 server capabilities = {:?}", caps); let camera_cfg = app_support::camera_config_from_caps(&caps); + let initial_cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok(); + let initial_cam_profile = initial_camera_profile_id_from_env(); + let initial_mic_source = std::env::var("LESAVKA_MIC_SOURCE").ok(); + let initial_audio_sink = std::env::var("LESAVKA_AUDIO_SINK").ok(); let media_controls = crate::live_media_control::LiveMediaControls::from_env( - crate::live_media_control::MediaControlState::new( + crate::live_media_control::MediaControlState::with_devices( caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(), caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(), std::env::var("LESAVKA_AUDIO_DISABLE").is_err(), + initial_cam_source.clone(), + initial_cam_profile.clone(), + initial_mic_source.clone(), + initial_audio_sink.clone(), ), ); let media_state = media_controls.refresh(); @@ -213,13 +221,8 @@ impl LesavkaClientApp { /*────────── audio renderer & puller ───────────*/ if std::env::var("LESAVKA_AUDIO_DISABLE").is_err() { - let audio_out = AudioOut::new()?; let ep_audio = vid_ep.clone(); - tokio::spawn(Self::audio_loop( - ep_audio, - audio_out, - media_controls.clone(), - )); + tokio::spawn(Self::audio_loop(ep_audio, media_controls.clone())); } else { info!("🔇 remote audio disabled for this relay session"); } @@ -238,79 +241,29 @@ impl LesavkaClientApp { ); } let ep = vid_ep.clone(); - let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok(); let cam_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera); let media_controls = media_controls.clone(); - tokio::spawn(async move { - let result = tokio::task::spawn_blocking(move || { - CameraCapture::new(cam_source.as_deref(), camera_cfg) - }) - .await; - match result { - Ok(Ok(cam)) => { - let cam = Arc::new(cam); - tokio::spawn(Self::cam_loop( - ep, - cam, - cam_telemetry.clone(), - media_controls.clone(), - )); - } - Ok(Err(err)) => { - cam_telemetry.record_disconnect(format!( - "webcam uplink setup failed: {err:#}" - )); - warn!( - "📸 webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}" - ); - } - Err(err) => { - cam_telemetry.record_disconnect(format!( - "webcam uplink setup task failed: {err}" - )); - warn!( - "📸 webcam uplink setup task failed before StreamCamera could start: {err}" - ); - } - } - }); + tokio::spawn(Self::cam_loop( + ep, + initial_cam_source.clone(), + initial_cam_profile.clone(), + camera_cfg, + cam_telemetry, + media_controls, + )); } if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { let ep = vid_ep.clone(); let mic_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); let media_controls = media_controls.clone(); - tokio::spawn(async move { - let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await; - match result { - Ok(Ok(mic)) => { - let mic = Arc::new(mic); - tokio::spawn(Self::voice_loop( - ep, - mic, - mic_telemetry.clone(), - media_controls.clone(), - )); - } - Ok(Err(err)) => { - mic_telemetry.record_disconnect(format!( - "microphone uplink setup failed: {err:#}" - )); - warn!( - "🎤 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}" - ); - } - Err(err) => { - mic_telemetry.record_disconnect(format!( - "microphone uplink setup task failed: {err}" - )); - warn!( - "🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}" - ); - } - } - }); + tokio::spawn(Self::voice_loop( + ep, + initial_mic_source.clone(), + mic_telemetry, + media_controls, + )); } /*────────── central reactor ───────────────────*/ diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 1a55e8d..61f48b5 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -3,7 +3,7 @@ impl LesavkaClientApp { #[cfg(not(coverage))] async fn voice_loop( ep: Channel, - mic: Arc, + initial_source: Option, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { @@ -11,6 +11,48 @@ impl LesavkaClientApp { static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { + let state = media_controls.refresh(); + if !state.microphone { + telemetry.record_enabled(false); + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + let microphone_source_choice = state.microphone_source.clone(); + let active_source = microphone_source_choice.resolve(initial_source.as_deref()); + let use_default_source = matches!( + microphone_source_choice, + crate::live_media_control::MediaDeviceChoice::Auto + ) && active_source.is_none(); + let setup_source = active_source.clone(); + let result = tokio::task::spawn_blocking(move || { + if use_default_source { + MicrophoneCapture::new_default_source() + } else { + MicrophoneCapture::new_with_source(setup_source.as_deref()) + } + }) + .await; + let mic = match result { + Ok(Ok(mic)) => Arc::new(mic), + Ok(Err(err)) => { + telemetry.record_disconnect(format!("microphone uplink setup failed: {err:#}")); + warn!( + "🎤 microphone uplink setup failed for {:?}: {err:#}", + active_source.as_deref().unwrap_or("auto") + ); + delay = app_support::next_delay(delay); + tokio::time::sleep(delay).await; + continue; + } + Err(err) => { + telemetry.record_disconnect(format!("microphone uplink setup task failed: {err}")); + warn!("🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}"); + delay = app_support::next_delay(delay); + tokio::time::sleep(delay).await; + continue; + } + }; + telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE); @@ -55,10 +97,24 @@ impl LesavkaClientApp { let queue_thread = queue.clone(); let drop_log_thread = Arc::clone(&drop_log); let media_controls_thread = media_controls.clone(); + let initial_source_thread = initial_source.clone(); + let active_source_thread = active_source.clone(); let mic_worker = std::thread::spawn(move || { let mut paused = false; while stop_rx.try_recv().is_err() { - if !media_controls_thread.refresh().microphone { + let state = media_controls_thread.refresh(); + let desired_source = state + .microphone_source + .resolve(initial_source_thread.as_deref()); + if desired_source != active_source_thread { + tracing::info!( + from = active_source_thread.as_deref().unwrap_or("auto"), + to = desired_source.as_deref().unwrap_or("auto"), + "🎤 microphone source changed; restarting live uplink pipeline" + ); + break; + } + if !state.microphone { if !paused { telemetry_thread.record_enabled(false); tracing::info!("🎤 microphone uplink soft-paused"); @@ -123,13 +179,56 @@ impl LesavkaClientApp { #[cfg(not(coverage))] async fn cam_loop( ep: Channel, - cam: Arc, + initial_source: Option, + initial_profile: Option, + camera_cfg: Option, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); loop { + let state = media_controls.refresh(); + if !state.camera { + telemetry.record_enabled(false); + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + let active_source = state.camera_source.resolve(initial_source.as_deref()); + let active_profile = state.camera_profile.resolve(initial_profile.as_deref()); + let capture_profile = active_profile + .as_deref() + .and_then(parse_camera_profile_id); + let setup_source = active_source.clone(); + let result = tokio::task::spawn_blocking(move || { + CameraCapture::new_with_capture_profile( + setup_source.as_deref(), + camera_cfg, + capture_profile, + ) + }) + .await; + let cam = match result { + Ok(Ok(cam)) => Arc::new(cam), + Ok(Err(err)) => { + telemetry.record_disconnect(format!("webcam uplink setup failed: {err:#}")); + warn!( + "📸 webcam uplink setup failed for {:?}: {err:#}", + active_source.as_deref().unwrap_or("auto") + ); + delay = app_support::next_delay(delay); + tokio::time::sleep(delay).await; + continue; + } + Err(err) => { + telemetry.record_disconnect(format!("webcam uplink setup task failed: {err}")); + warn!("📸 webcam uplink setup task failed before StreamCamera could start: {err}"); + delay = app_support::next_delay(delay); + tokio::time::sleep(delay).await; + continue; + } + }; + telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE); @@ -173,21 +272,68 @@ impl LesavkaClientApp { let queue = queue.clone(); let drop_log = Arc::clone(&drop_log); let media_controls = media_controls.clone(); + let initial_source_thread = initial_source.clone(); + let active_source_thread = active_source.clone(); + let initial_profile_thread = initial_profile.clone(); + let active_profile_thread = active_profile.clone(); move || loop { if stop_rx.try_recv().is_ok() { break; } - if !media_controls.refresh().camera { + let state = media_controls.refresh(); + let desired_source = + state.camera_source.resolve(initial_source_thread.as_deref()); + let desired_profile = + state.camera_profile.resolve(initial_profile_thread.as_deref()); + if desired_source != active_source_thread + || desired_profile != active_profile_thread + { + tracing::info!( + from = active_source_thread.as_deref().unwrap_or("auto"), + to = desired_source.as_deref().unwrap_or("auto"), + "📸 webcam source changed; restarting live uplink pipeline" + ); + break; + } + if !state.camera { telemetry.record_enabled(false); tracing::info!("📸 webcam uplink soft-paused"); - while stop_rx.try_recv().is_err() - && !media_controls.refresh().camera - { + while stop_rx.try_recv().is_err() { + let state = media_controls.refresh(); + let desired_source = + state.camera_source.resolve(initial_source_thread.as_deref()); + let desired_profile = state + .camera_profile + .resolve(initial_profile_thread.as_deref()); + if desired_source != active_source_thread + || desired_profile != active_profile_thread + { + break; + } + if state.camera { + break; + } std::thread::sleep(Duration::from_millis(25)); } if stop_rx.try_recv().is_ok() { break; } + let state = media_controls.refresh(); + let desired_source = + state.camera_source.resolve(initial_source_thread.as_deref()); + let desired_profile = state + .camera_profile + .resolve(initial_profile_thread.as_deref()); + if desired_source != active_source_thread + || desired_profile != active_profile_thread + { + tracing::info!( + from = active_source_thread.as_deref().unwrap_or("auto"), + to = desired_source.as_deref().unwrap_or("auto"), + "📸 webcam source changed while paused; restarting live uplink pipeline" + ); + break; + } telemetry.record_enabled(true); tracing::info!("📸 webcam uplink resumed"); } @@ -248,6 +394,24 @@ impl LesavkaClientApp { } } +#[cfg(not(coverage))] +fn initial_camera_profile_id_from_env() -> Option { + let width = std::env::var("LESAVKA_CAM_WIDTH").ok()?; + let height = std::env::var("LESAVKA_CAM_HEIGHT").ok()?; + let fps = std::env::var("LESAVKA_CAM_FPS").ok()?; + Some(format!("{width}x{height}@{fps}")) +} + +#[cfg(not(coverage))] +fn parse_camera_profile_id(raw: &str) -> Option<(u32, u32, u32)> { + let (size, fps) = raw.split_once('@')?; + let (width, height) = size.split_once('x')?; + let width = width.parse().ok()?; + let height = height.parse().ok()?; + let fps = fps.parse().ok()?; + (width > 0 && height > 0 && fps > 0).then_some((width, height, fps)) +} + #[cfg(not(coverage))] const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { diff --git a/client/src/input/camera/capture_pipeline.rs b/client/src/input/camera/capture_pipeline.rs index b9da408..c7a932c 100644 --- a/client/src/input/camera/capture_pipeline.rs +++ b/client/src/input/camera/capture_pipeline.rs @@ -1,5 +1,13 @@ impl CameraCapture { pub fn new(device_fragment: Option<&str>, cfg: Option) -> anyhow::Result { + Self::new_with_capture_profile(device_fragment, cfg, None) + } + + pub fn new_with_capture_profile( + device_fragment: Option<&str>, + cfg: Option, + capture_profile_override: Option<(u32, u32, u32)>, + ) -> anyhow::Result { gst::init().ok(); // Select source: V4L2 device or test pattern @@ -41,7 +49,7 @@ impl CameraCapture { |cfg| matches!(cfg.codec, CameraCodec::Mjpeg), ); let jpeg_quality = env_u32("LESAVKA_CAM_JPEG_QUALITY", 85).clamp(1, 100); - let capture_profile = resolved_capture_profile(cfg); + let capture_profile = capture_profile_override.unwrap_or_else(|| resolved_capture_profile(cfg)); let (capture_width, capture_height, capture_fps) = capture_profile; let (width, height, fps) = resolved_output_profile(cfg, capture_profile); let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps); @@ -323,13 +331,14 @@ fn log_camera_timing_sample( source_pts_us = timing.source_pts_us.unwrap_or_default(), source_base_us = timing.source_base_us.unwrap_or_default(), capture_base_us = timing.capture_base_us.unwrap_or_default(), - capture_now_us = timing.capture_now_us, - packet_pts_us = timing.packet_pts_us, - pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, - used_source_pts = timing.used_source_pts, - lag_clamped = timing.lag_clamped, - bytes, - "📸 upstream webcam timing sample" - ); - } + capture_now_us = timing.capture_now_us, + packet_pts_us = timing.packet_pts_us, + pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, + used_source_pts = timing.used_source_pts, + lag_clamped = timing.lag_clamped, + lead_clamped = timing.lead_clamped, + bytes, + "📸 upstream webcam timing sample" + ); +} } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 7d148af..d2fd698 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -48,11 +48,31 @@ pub struct MicrophoneCapture { impl MicrophoneCapture { pub fn new() -> Result { + Self::new_with_source_and_env(None, true) + } + + pub fn new_with_source(source_override: Option<&str>) -> Result { + Self::new_with_source_and_env(source_override, true) + } + + pub fn new_default_source() -> Result { + Self::new_with_source_and_env(None, false) + } + + fn new_with_source_and_env( + source_override: Option<&str>, + allow_env_source: bool, + ) -> Result { gst::init().ok(); // idempotent /* preferred path: pipewiresrc; fallback: pulsesrc ----------------*/ - let source_desc = match std::env::var("LESAVKA_MIC_SOURCE") { - Ok(s) if !s.is_empty() => match Self::resolve_source_desc(&s) { + let selected_source = source_override.map(str::to_string).or_else(|| { + allow_env_source + .then(|| std::env::var("LESAVKA_MIC_SOURCE").ok()) + .flatten() + }); + let source_desc = match selected_source { + Some(s) if !s.is_empty() => match Self::resolve_source_desc(&s) { Some(desc) => desc, None => { warn!("🎤 requested mic '{s}' not found; using default"); @@ -166,6 +186,7 @@ impl MicrophoneCapture { timing.capture_now_us as i128 - timing.packet_pts_us as i128, used_source_pts = timing.used_source_pts, lag_clamped = timing.lag_clamped, + lead_clamped = timing.lead_clamped, bytes = map.len(), packet_duration_us, split_packets = packet_count, diff --git a/client/src/launcher/tests/ui_runtime.rs b/client/src/launcher/tests/ui_runtime.rs index 09fd8e5..620a7de 100644 --- a/client/src/launcher/tests/ui_runtime.rs +++ b/client/src/launcher/tests/ui_runtime.rs @@ -569,12 +569,22 @@ fn write_media_control_request_formats_soft_pause_state() { state.set_camera_channel_enabled(true); state.set_microphone_channel_enabled(false); state.set_audio_channel_enabled(true); + state.select_camera(Some("Logitech BRIO".to_string())); + state.select_camera_quality(Some(CameraMode::new(1280, 720, 30))); + state.select_microphone(Some( + "alsa_input.usb-Neat Microphones Bumblebee".to_string(), + )); + state.select_speaker(Some("bluez_output.80_C3_BA_76_26_AB.1".to_string())); write_media_control_request(&path, &state).expect("write media control"); let raw = std::fs::read_to_string(path).expect("read media control"); assert!(raw.contains("camera=1"), "{raw}"); assert!(raw.contains("microphone=0"), "{raw}"); assert!(raw.contains("audio=1"), "{raw}"); + assert!(raw.contains("camera_source=b64:"), "{raw}"); + assert!(raw.contains("camera_profile=b64:"), "{raw}"); + assert!(raw.contains("microphone_source=b64:"), "{raw}"); + assert!(raw.contains("audio_sink=b64:"), "{raw}"); } #[gtk::test] diff --git a/client/src/launcher/ui/control_requests.rs b/client/src/launcher/ui/control_requests.rs index 84f2913..006d33d 100644 --- a/client/src/launcher/ui/control_requests.rs +++ b/client/src/launcher/ui/control_requests.rs @@ -135,6 +135,31 @@ fn apply_media_control_change( } } +#[cfg(not(coverage))] +/// Apply a live media device-selection change by asking the relay child to rebuild that pipeline. +fn apply_live_media_device_change( + state_snapshot: &LauncherState, + widgets: &super::ui_components::LauncherWidgets, + child_proc: &Rc>>, + feed_label: &str, +) { + let relay_live = child_proc + .try_borrow() + .map(|child| child.is_some()) + .unwrap_or(false); + if relay_live { + let path = media_control_path(); + match write_media_control_request(&path, state_snapshot) { + Ok(()) => widgets.status_label.set_text(&format!( + "{feed_label} selection applied to the live relay; the stream is restarting." + )), + Err(err) => widgets.status_label.set_text(&format!( + "{feed_label} selection is staged for the next relay launch, but live device control could not be written: {err}" + )), + } + } +} + #[cfg(not(coverage))] /// Refresh relay capture-power state in the background so GTK stays responsive. fn request_capture_power_refresh( diff --git a/client/src/launcher/ui/media_device_bindings.rs b/client/src/launcher/ui/media_device_bindings.rs index a6a55a7..ba39312 100644 --- a/client/src/launcher/ui/media_device_bindings.rs +++ b/client/src/launcher/ui/media_device_bindings.rs @@ -12,8 +12,11 @@ .select_microphone(selected_combo_value(µphone_combo_read)); let relay_live = child_proc.borrow().is_some(); if relay_live { - widgets.status_label.set_text( - "Microphone selection staged for the next relay launch. Use the Mic toggle to soft-pause or resume the current live feed.", + apply_live_media_device_change( + &state.borrow(), + &widgets, + &child_proc, + "Microphone", ); } else if tests.borrow_mut().is_running(DeviceTestKind::Microphone) { widgets.status_label.set_text( @@ -41,8 +44,11 @@ tests.borrow_mut().is_running(DeviceTestKind::Microphone); let relay_live = child_proc.borrow().is_some(); if relay_live { - widgets.status_label.set_text( - "Speaker selection staged for the next relay launch. Speaker gain still applies live.", + apply_live_media_device_change( + &state.borrow(), + &widgets, + &child_proc, + "Speaker", ); } else if speaker_running || microphone_running { widgets.status_label.set_text( diff --git a/client/src/launcher/ui/stage_device_bindings.rs b/client/src/launcher/ui/stage_device_bindings.rs index e170ea3..6238ae2 100644 --- a/client/src/launcher/ui/stage_device_bindings.rs +++ b/client/src/launcher/ui/stage_device_bindings.rs @@ -31,8 +31,11 @@ .status_label .set_text(&format!("Camera quality update failed: {err}")); } else if child_proc.borrow().is_some() { - widgets.status_label.set_text( - "Camera selection staged for the next relay launch. Use the Camera toggle to soft-pause or resume the current live feed.", + apply_live_media_device_change( + &state.borrow(), + &widgets, + &child_proc, + "Camera", ); } else if preview_was_running { widgets.status_label.set_text(&format!( @@ -73,8 +76,11 @@ .status_label .set_text(&format!("Camera quality update failed: {err}")); } else if child_proc.borrow().is_some() { - widgets.status_label.set_text( - "Camera quality staged for the next relay launch. The live feed keeps its current capture pipeline.", + apply_live_media_device_change( + &state.borrow(), + &widgets, + &child_proc, + "Camera quality", ); } else if preview_was_running { widgets.status_label.set_text(&format!( diff --git a/client/src/launcher/ui_runtime/control_paths.rs b/client/src/launcher/ui_runtime/control_paths.rs index a2054d8..4d26d1a 100644 --- a/client/src/launcher/ui_runtime/control_paths.rs +++ b/client/src/launcher/ui_runtime/control_paths.rs @@ -135,10 +135,14 @@ pub fn write_mic_gain_request(path: &Path, gain_percent: u32) -> Result<()> { pub fn write_media_control_request(path: &Path, state: &LauncherState) -> Result<()> { crate::live_media_control::write_media_control_request( path, - crate::live_media_control::MediaControlState::new( + crate::live_media_control::MediaControlState::with_devices( state.channels.camera, state.channels.microphone, state.channels.audio, + state.devices.camera.clone(), + state.camera_quality.map(|mode| mode.id()), + state.devices.microphone.clone(), + state.devices.speaker.clone(), ), )?; Ok(()) diff --git a/client/src/live_capture_clock.rs b/client/src/live_capture_clock.rs index 194369e..a6ad58b 100644 --- a/client/src/live_capture_clock.rs +++ b/client/src/live_capture_clock.rs @@ -5,6 +5,7 @@ use std::time::{Duration, Instant}; static CAPTURE_ORIGIN: OnceLock = OnceLock::new(); const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250; +const DEFAULT_SOURCE_LEAD_CAP_MS: u64 = 80; fn origin() -> Instant { *CAPTURE_ORIGIN.get_or_init(Instant::now) @@ -71,6 +72,22 @@ pub fn upstream_source_lag_cap() -> Duration { .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS)) } +/// Cap how far source-derived packet timestamps may lead the live capture clock. +/// +/// Inputs: none. +/// Outputs: the maximum tolerated future lead for source-based packet PTS. +/// Why: live sources can flush a burst of future-stamped buffers; if those +/// future timestamps escape, the server freezes media waiting for local backlog. +#[must_use] +pub fn upstream_source_lead_cap() -> Duration { + std::env::var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS") + .ok() + .and_then(|raw| raw.trim().parse::().ok()) + .filter(|value| *value > 0) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LEAD_CAP_MS)) +} + #[derive(Debug, Default)] struct SourcePtsRebaserState { source_base_us: Option, @@ -102,11 +119,13 @@ pub struct RebasedSourcePts { pub capture_base_us: Option, pub used_source_pts: bool, pub lag_clamped: bool, + pub lead_clamped: bool, } #[derive(Debug, Default)] struct DurationPacedSourcePtsState { next_packet_pts_us: Option, + last_packet_pts_us: Option, } /// Rebase encoded packet timing by anchoring once, then pacing by duration. @@ -162,6 +181,7 @@ impl SourcePtsRebaser { let mut packet_pts_us = capture_now_us; let mut used_source_pts = false; let mut lag_clamped = false; + let mut lead_clamped = false; if let Some(source_pts_us) = source_pts_us { let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); @@ -179,6 +199,14 @@ impl SourcePtsRebaser { packet_pts_us = lag_floor_us; lag_clamped = true; } + let lead_ceiling_us = + capture_now_us.saturating_add( + upstream_source_lead_cap().as_micros().min(u64::MAX as u128) as u64, + ); + if packet_pts_us > lead_ceiling_us { + packet_pts_us = lead_ceiling_us; + lead_clamped = true; + } } if let Some(last_packet_pts_us) = state.last_packet_pts_us @@ -196,6 +224,7 @@ impl SourcePtsRebaser { capture_base_us: state.capture_base_us, used_source_pts, lag_clamped, + lead_clamped, } } } @@ -232,6 +261,19 @@ impl DurationPacedSourcePtsRebaser { packet_pts_us = lag_floor_us; rebased.lag_clamped = true; } + let lead_ceiling_us = rebased + .capture_now_us + .saturating_add(upstream_source_lead_cap().as_micros().min(u64::MAX as u128) as u64); + if packet_pts_us > lead_ceiling_us { + packet_pts_us = lead_ceiling_us; + rebased.lead_clamped = true; + } + if let Some(last_packet_pts_us) = state.last_packet_pts_us + && packet_pts_us <= last_packet_pts_us + { + packet_pts_us = last_packet_pts_us.saturating_add(1); + } + state.last_packet_pts_us = Some(packet_pts_us); state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us)); rebased.packet_pts_us = packet_pts_us; rebased @@ -242,7 +284,7 @@ impl DurationPacedSourcePtsRebaser { mod tests { use super::{ DurationPacedSourcePtsRebaser, SourcePtsRebaser, capture_pts_us, packet_age, - upstream_source_lag_cap, upstream_timing_trace_enabled, + upstream_source_lag_cap, upstream_source_lead_cap, upstream_timing_trace_enabled, }; use serial_test::serial; use std::time::Duration; @@ -306,6 +348,8 @@ mod tests { assert!(second.packet_pts_us > first.packet_pts_us); assert!(!first.lag_clamped); assert!(!second.lag_clamped); + assert!(!first.lead_clamped); + assert!(!second.lead_clamped); } #[test] @@ -323,6 +367,23 @@ mod tests { assert!(second.capture_now_us - second.packet_pts_us <= 2_500); } + #[test] + #[serial] + fn source_pts_rebaser_clamps_source_lead_when_it_runs_too_far_ahead() { + temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", Some("5"), || { + let rebased = SourcePtsRebaser::default(); + let _first = + rebased.rebase_with_lag_cap(Some(1_000_000), 1, Some(Duration::from_millis(250))); + let second = + rebased.rebase_with_lag_cap(Some(2_000_000), 1, Some(Duration::from_millis(250))); + + assert!(second.used_source_pts); + assert!(second.lead_clamped); + assert!(second.packet_pts_us >= second.capture_now_us); + assert!(second.packet_pts_us <= second.capture_now_us + 5_500); + }); + } + #[test] #[serial] fn source_pts_rebasers_anchor_each_stream_to_its_own_first_packet_time() { @@ -373,6 +434,18 @@ mod tests { }); } + #[test] + #[serial] + fn upstream_source_lead_cap_defaults_and_accepts_override() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", || { + assert_eq!(upstream_source_lead_cap(), Duration::from_millis(80)); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", Some("35"), || { + assert_eq!(upstream_source_lead_cap(), Duration::from_millis(35)); + }); + } + #[test] #[serial] fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() { @@ -402,4 +475,24 @@ mod tests { "duration-paced packet pts should never trail live capture by more than the lag cap" ); } + + #[test] + #[serial] + fn duration_paced_rebaser_clamps_when_duration_pacing_runs_future() { + temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", Some("15"), || { + let rebased = DurationPacedSourcePtsRebaser::default(); + let mut last = + rebased.rebase_with_packet_duration(Some(0), 50_000, Duration::from_millis(250)); + for packet_index in 1..12 { + last = rebased.rebase_with_packet_duration( + Some(packet_index * 50_000), + 50_000, + Duration::from_millis(250), + ); + } + + assert!(last.lead_clamped); + assert!(last.packet_pts_us <= last.capture_now_us + 16_000); + }); + } } diff --git a/client/src/live_media_control.rs b/client/src/live_media_control.rs index 61796b3..4708bb9 100644 --- a/client/src/live_media_control.rs +++ b/client/src/live_media_control.rs @@ -7,23 +7,80 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; +use base64::{Engine as _, engine::general_purpose::STANDARD as B64}; + pub const MEDIA_CONTROL_ENV: &str = "LESAVKA_MEDIA_CONTROL"; pub const DEFAULT_MEDIA_CONTROL_PATH: &str = "/tmp/lesavka-media.control"; -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum MediaDeviceChoice { + Inherit, + Auto, + Selected(String), +} + +impl MediaDeviceChoice { + #[must_use] + pub fn from_selection(selection: Option) -> Self { + selection + .filter(|value| !value.trim().is_empty()) + .map(Self::Selected) + .unwrap_or(Self::Auto) + } + + #[must_use] + pub fn resolve(&self, fallback: Option<&str>) -> Option { + match self { + Self::Inherit => fallback.map(str::to_string), + Self::Auto => None, + Self::Selected(value) => Some(value.clone()), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct MediaControlState { pub camera: bool, pub microphone: bool, pub audio: bool, + pub camera_source: MediaDeviceChoice, + pub camera_profile: MediaDeviceChoice, + pub microphone_source: MediaDeviceChoice, + pub audio_sink: MediaDeviceChoice, } impl MediaControlState { #[must_use] - pub const fn new(camera: bool, microphone: bool, audio: bool) -> Self { + pub fn new(camera: bool, microphone: bool, audio: bool) -> Self { Self { camera, microphone, audio, + camera_source: MediaDeviceChoice::Inherit, + camera_profile: MediaDeviceChoice::Inherit, + microphone_source: MediaDeviceChoice::Inherit, + audio_sink: MediaDeviceChoice::Inherit, + } + } + + #[must_use] + pub fn with_devices( + camera: bool, + microphone: bool, + audio: bool, + camera_source: Option, + camera_profile: Option, + microphone_source: Option, + audio_sink: Option, + ) -> Self { + Self { + camera, + microphone, + audio, + camera_source: MediaDeviceChoice::from_selection(camera_source), + camera_profile: MediaDeviceChoice::from_selection(camera_profile), + microphone_source: MediaDeviceChoice::from_selection(microphone_source), + audio_sink: MediaDeviceChoice::from_selection(audio_sink), } } } @@ -63,11 +120,11 @@ impl LiveMediaControls { { inner.state = state; } - inner.state + inner.state.clone() } } -/// Writes one atomic-ish soft-pause request for the running relay child to poll. +/// Writes one atomic-ish soft-pause/device request for the running relay child to poll. pub(crate) fn write_media_control_request( path: &Path, state: MediaControlState, @@ -75,11 +132,15 @@ pub(crate) fn write_media_control_request( fs::write( path, format!( - "camera={} microphone={} audio={} {}\n", + "camera={} microphone={} audio={} camera_source={} camera_profile={} microphone_source={} audio_sink={} nonce={}\n", bool_flag(state.camera), bool_flag(state.microphone), bool_flag(state.audio), - control_request_nonce() + encode_choice(&state.camera_source), + encode_choice(&state.camera_profile), + encode_choice(&state.microphone_source), + encode_choice(&state.audio_sink), + control_request_nonce(), ), ) } @@ -89,6 +150,10 @@ fn parse_media_control_state(raw: &str) -> Option { let mut camera = None; let mut microphone = None; let mut audio = None; + let mut camera_source = MediaDeviceChoice::Inherit; + let mut camera_profile = MediaDeviceChoice::Inherit; + let mut microphone_source = MediaDeviceChoice::Inherit; + let mut audio_sink = MediaDeviceChoice::Inherit; for token in raw.split_ascii_whitespace() { let Some((key, value)) = token.split_once('=') else { continue; @@ -97,6 +162,12 @@ fn parse_media_control_state(raw: &str) -> Option { "camera" => camera = Some(parse_bool_flag(value)?), "microphone" | "mic" => microphone = Some(parse_bool_flag(value)?), "audio" | "speaker" => audio = Some(parse_bool_flag(value)?), + "camera_source" | "camera_source_b64" => camera_source = parse_choice(value)?, + "camera_profile" | "camera_quality" => camera_profile = parse_choice(value)?, + "microphone_source" | "mic_source" | "microphone_source_b64" => { + microphone_source = parse_choice(value)?; + } + "audio_sink" | "speaker_sink" | "audio_sink_b64" => audio_sink = parse_choice(value)?, _ => {} } } @@ -104,9 +175,37 @@ fn parse_media_control_state(raw: &str) -> Option { camera: camera?, microphone: microphone?, audio: audio?, + camera_source, + camera_profile, + microphone_source, + audio_sink, }) } +fn encode_choice(choice: &MediaDeviceChoice) -> String { + match choice { + MediaDeviceChoice::Inherit => "inherit".to_string(), + MediaDeviceChoice::Auto => "auto".to_string(), + MediaDeviceChoice::Selected(value) => format!("b64:{}", B64.encode(value.as_bytes())), + } +} + +fn parse_choice(value: &str) -> Option { + let value = value.trim(); + if value.is_empty() || value.eq_ignore_ascii_case("auto") { + return Some(MediaDeviceChoice::Auto); + } + if value.eq_ignore_ascii_case("inherit") { + return Some(MediaDeviceChoice::Inherit); + } + if let Some(encoded) = value.strip_prefix("b64:") { + let decoded = B64.decode(encoded).ok()?; + let decoded = String::from_utf8(decoded).ok()?; + return Some(MediaDeviceChoice::from_selection(Some(decoded))); + } + Some(MediaDeviceChoice::from_selection(Some(value.to_string()))) +} + fn parse_bool_flag(value: &str) -> Option { match value.trim().to_ascii_lowercase().as_str() { "1" | "true" | "on" | "yes" => Some(true), @@ -138,6 +237,37 @@ mod tests { ); } + #[test] + fn parses_media_control_state_with_live_device_choices() { + let state = MediaControlState::with_devices( + true, + true, + true, + Some("Logitech BRIO".to_string()), + Some("1280x720@30".to_string()), + Some("alsa_input.usb-Neat Microphones".to_string()), + None, + ); + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("media.control"); + write_media_control_request(&path, state.clone()).expect("write controls"); + let raw = fs::read_to_string(path).expect("read controls"); + assert_eq!(parse_media_control_state(&raw), Some(state)); + } + + #[test] + fn device_choices_resolve_inherit_auto_and_selected() { + assert_eq!( + MediaDeviceChoice::Inherit.resolve(Some("env-device")), + Some("env-device".to_string()) + ); + assert_eq!(MediaDeviceChoice::Auto.resolve(Some("env-device")), None); + assert_eq!( + MediaDeviceChoice::Selected("chosen".to_string()).resolve(Some("env-device")), + Some("chosen".to_string()) + ); + } + #[test] fn live_media_controls_refresh_after_file_changes() { let dir = tempfile::tempdir().expect("tempdir"); diff --git a/client/src/output/audio.rs b/client/src/output/audio.rs index 3f073fb..673da68 100644 --- a/client/src/output/audio.rs +++ b/client/src/output/audio.rs @@ -35,8 +35,23 @@ struct AudioTimeline { impl AudioOut { pub fn new() -> anyhow::Result { + Self::new_with_sink_and_env(None, true) + } + + pub fn new_with_sink(sink_override: Option<&str>) -> anyhow::Result { + Self::new_with_sink_and_env(sink_override, true) + } + + pub fn new_default_sink() -> anyhow::Result { + Self::new_with_sink_and_env(None, false) + } + + fn new_with_sink_and_env( + sink_override: Option<&str>, + allow_env_sink: bool, + ) -> anyhow::Result { gst::init().context("initialising GStreamer")?; - let sink = pick_sink_element()?; + let sink = pick_sink_element(sink_override, allow_env_sink)?; let tee_dump = std::env::var("LESAVKA_TAP_AUDIO") .ok() .as_deref() @@ -261,8 +276,13 @@ impl Drop for AudioOut { } #[cfg(not(coverage))] -fn pick_sink_element() -> Result { - if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { +fn pick_sink_element(sink_override: Option<&str>, allow_env_sink: bool) -> Result { + if let Some(s) = sink_override.filter(|value| !value.trim().is_empty()) { + let sink = normalize_sink_override(s); + info!("💪 sink overridden via live media control={s} -> {sink}"); + return Ok(sink); + } + if allow_env_sink && let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { let sink = normalize_sink_override(&s); info!( "💪 sink overridden via LESAVKA_AUDIO_SINK={} -> {}", @@ -280,8 +300,11 @@ fn pick_sink_element() -> Result { } #[cfg(coverage)] -fn pick_sink_element() -> Result { - if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { +fn pick_sink_element(sink_override: Option<&str>, allow_env_sink: bool) -> Result { + if let Some(s) = sink_override.filter(|value| !value.trim().is_empty()) { + return Ok(normalize_sink_override(s)); + } + if allow_env_sink && let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { return Ok(normalize_sink_override(&s)); } if let Some((n, _)) = list_pw_sinks().first() { diff --git a/common/Cargo.toml b/common/Cargo.toml index c709df9..424ded3 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.6" +version = "0.17.7" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 9081848..0b340b9 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.6" +version = "0.17.7" edition = "2024" autobins = false diff --git a/testing/tests/client_camera_include_contract.rs b/testing/tests/client_camera_include_contract.rs index 4b3395b..822beb3 100644 --- a/testing/tests/client_camera_include_contract.rs +++ b/testing/tests/client_camera_include_contract.rs @@ -465,6 +465,7 @@ mod camera_include_contract { capture_base_us: Some(7_345), used_source_pts: true, lag_clamped: false, + lead_clamped: false, }, 256, ); diff --git a/testing/tests/support/live_capture_clock_shim.rs b/testing/tests/support/live_capture_clock_shim.rs index 1ac4068..74040c7 100644 --- a/testing/tests/support/live_capture_clock_shim.rs +++ b/testing/tests/support/live_capture_clock_shim.rs @@ -10,6 +10,7 @@ use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250; +const DEFAULT_SOURCE_LEAD_CAP_MS: u64 = 80; fn capture_clock_origin() -> &'static Instant { static ORIGIN: OnceLock = OnceLock::new(); @@ -46,6 +47,15 @@ pub fn upstream_source_lag_cap() -> Duration { .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS)) } +pub fn upstream_source_lead_cap() -> Duration { + std::env::var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS") + .ok() + .and_then(|raw| raw.trim().parse::().ok()) + .filter(|value| *value > 0) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LEAD_CAP_MS)) +} + #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct RebasedSourcePts { pub packet_pts_us: u64, @@ -55,6 +65,7 @@ pub struct RebasedSourcePts { pub capture_base_us: Option, pub used_source_pts: bool, pub lag_clamped: bool, + pub lead_clamped: bool, } #[derive(Debug, Default)] @@ -72,6 +83,7 @@ pub struct SourcePtsRebaser { #[derive(Debug, Default)] struct DurationPacedSourcePtsState { next_packet_pts_us: Option, + last_packet_pts_us: Option, } #[derive(Debug, Default)] @@ -99,6 +111,7 @@ impl SourcePtsRebaser { let mut packet_pts_us = capture_now_us; let mut used_source_pts = false; let mut lag_clamped = false; + let mut lead_clamped = false; if let Some(source_pts_us) = source_pts_us { let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); @@ -115,6 +128,15 @@ impl SourcePtsRebaser { packet_pts_us = lag_floor_us; lag_clamped = true; } + let lead_ceiling_us = capture_now_us.saturating_add( + upstream_source_lead_cap() + .as_micros() + .min(u64::MAX as u128) as u64, + ); + if packet_pts_us > lead_ceiling_us { + packet_pts_us = lead_ceiling_us; + lead_clamped = true; + } } if let Some(last_packet_pts_us) = state.last_packet_pts_us @@ -132,6 +154,7 @@ impl SourcePtsRebaser { capture_base_us: state.capture_base_us, used_source_pts, lag_clamped, + lead_clamped, } } } @@ -159,6 +182,21 @@ impl DurationPacedSourcePtsRebaser { packet_pts_us = lag_floor_us; rebased.lag_clamped = true; } + let lead_ceiling_us = rebased.capture_now_us.saturating_add( + upstream_source_lead_cap() + .as_micros() + .min(u64::MAX as u128) as u64, + ); + if packet_pts_us > lead_ceiling_us { + packet_pts_us = lead_ceiling_us; + rebased.lead_clamped = true; + } + if let Some(last_packet_pts_us) = state.last_packet_pts_us + && packet_pts_us <= last_packet_pts_us + { + packet_pts_us = last_packet_pts_us.saturating_add(1); + } + state.last_packet_pts_us = Some(packet_pts_us); state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us)); rebased.packet_pts_us = packet_pts_us; rebased