diff --git a/Cargo.lock b/Cargo.lock index e9bb1f2..a9eee68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.16" +version = "0.14.17" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.16" +version = "0.14.17" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.16" +version = "0.14.17" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index b0829f7..07198f6 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.16" +version = "0.14.17" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 131aece..1f0a471 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -13,28 +13,6 @@ impl LesavkaClientApp { telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE); - let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); - - let mic_clone = mic.clone(); - let telemetry_thread = telemetry.clone(); - let queue_thread = queue.clone(); - std::thread::spawn(move || { - while stop_rx.try_recv().is_err() { - if let Some(pkt) = mic_clone.pull() { - trace!("πŸŽ€πŸ“€ cli {} bytes β†’ gRPC", pkt.data.len()); - let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); - let stats = queue_thread.push(pkt, enqueue_age); - if stats.dropped_queue_full > 0 { - telemetry_thread.record_queue_full_drop(stats.dropped_queue_full); - } - telemetry_thread.record_enqueue( - queue_depth_u32(stats.queue_depth), - duration_ms(enqueue_age), - 0.0, - ); - } - } - }); let queue_stream = queue.clone(); let telemetry_stream = telemetry.clone(); @@ -58,10 +36,34 @@ impl LesavkaClientApp { match cli.stream_microphone(Request::new(outbound)).await { Ok(mut resp) => { + let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); + let mic_clone = mic.clone(); + let telemetry_thread = telemetry.clone(); + let queue_thread = queue.clone(); + let mic_worker = std::thread::spawn(move || { + while stop_rx.try_recv().is_err() { + if let Some(pkt) = mic_clone.pull() { + trace!("πŸŽ€πŸ“€ cli {} bytes β†’ gRPC", pkt.data.len()); + let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); + let stats = queue_thread.push(pkt, enqueue_age); + if stats.dropped_queue_full > 0 { + telemetry_thread.record_queue_full_drop(stats.dropped_queue_full); + } + telemetry_thread.record_enqueue( + queue_depth_u32(stats.queue_depth), + duration_ms(enqueue_age), + 0.0, + ); + } + } + }); delay = Duration::from_secs(1); telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} telemetry.record_disconnect("microphone uplink stream ended"); + queue.close(); + let _ = stop_tx.send(()); + let _ = mic_worker.join(); } Err(e) => { telemetry.record_disconnect(format!("microphone uplink connect failed: {e}")); @@ -76,7 +78,6 @@ impl LesavkaClientApp { } queue.close(); - let _ = stop_tx.send(()); tokio::time::sleep(delay).await; } } @@ -94,41 +95,6 @@ impl LesavkaClientApp { telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE); - let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); - - let cam_worker = std::thread::spawn({ - let cam = cam.clone(); - let telemetry = telemetry.clone(); - let queue = queue.clone(); - move || { - loop { - if stop_rx.try_recv().is_ok() { - break; - } - let Some(pkt) = cam.pull() else { - std::thread::sleep(Duration::from_millis(5)); - continue; - }; - static CNT: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); - let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n < 10 || n.is_multiple_of(120) { - tracing::trace!("πŸ“Έ cli frame#{n} {} B", pkt.data.len()); - } - tracing::trace!("πŸ“Έβ¬†οΈ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); - let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); - let stats = queue.push(pkt, enqueue_age); - if stats.dropped_queue_full > 0 { - telemetry.record_queue_full_drop(stats.dropped_queue_full); - } - telemetry.record_enqueue( - queue_depth_u32(stats.queue_depth), - duration_ms(enqueue_age), - 0.0, - ); - } - } - }); let queue_stream = queue.clone(); let telemetry_stream = telemetry.clone(); @@ -152,17 +118,50 @@ impl LesavkaClientApp { match cli.stream_camera(Request::new(outbound)).await { Ok(mut resp) => { + let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); + let cam_worker = std::thread::spawn({ + let cam = cam.clone(); + let telemetry = telemetry.clone(); + let queue = queue.clone(); + move || loop { + if stop_rx.try_recv().is_ok() { + break; + } + let Some(pkt) = cam.pull() else { + std::thread::sleep(Duration::from_millis(5)); + continue; + }; + static CNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n.is_multiple_of(120) { + tracing::trace!("πŸ“Έ cli frame#{n} {} B", pkt.data.len()); + } + tracing::trace!("πŸ“Έβ¬†οΈ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); + let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); + let stats = queue.push(pkt, enqueue_age); + if stats.dropped_queue_full > 0 { + telemetry.record_queue_full_drop(stats.dropped_queue_full); + } + telemetry.record_enqueue( + queue_depth_u32(stats.queue_depth), + duration_ms(enqueue_age), + 0.0, + ); + } + }); delay = Duration::from_secs(1); telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} telemetry.record_disconnect("camera uplink stream ended"); + queue.close(); + let _ = stop_tx.send(()); + let _ = cam_worker.join(); } Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("πŸ“Έ server does not support StreamCamera – giving up"); telemetry.record_disconnect("camera uplink unavailable on server"); queue.close(); - let _ = stop_tx.send(()); - let _ = cam_worker.join(); return; } Err(e) => { @@ -173,8 +172,6 @@ impl LesavkaClientApp { } queue.close(); - let _ = stop_tx.send(()); - let _ = cam_worker.join(); tokio::time::sleep(delay).await; } } @@ -183,8 +180,8 @@ impl LesavkaClientApp { #[cfg(not(coverage))] const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { - capacity: 8, - max_age: Duration::from_millis(350), + capacity: 32, + max_age: Duration::from_secs(1), }; #[cfg(not(coverage))] diff --git a/client/src/live_capture_clock.rs b/client/src/live_capture_clock.rs index 6e2efb0..08a158e 100644 --- a/client/src/live_capture_clock.rs +++ b/client/src/live_capture_clock.rs @@ -4,12 +4,17 @@ use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; static CAPTURE_ORIGIN: OnceLock = OnceLock::new(); +static SHARED_SOURCE_CAPTURE_BASE_US: OnceLock>> = OnceLock::new(); const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250; fn origin() -> Instant { *CAPTURE_ORIGIN.get_or_init(Instant::now) } +fn shared_source_capture_base_slot() -> &'static Mutex> { + SHARED_SOURCE_CAPTURE_BASE_US.get_or_init(|| Mutex::new(None)) +} + /// Return the shared live-capture timestamp for upstream camera/mic packets. /// /// Inputs: none. @@ -164,7 +169,13 @@ impl SourcePtsRebaser { if let Some(source_pts_us) = source_pts_us { let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); - let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us); + let capture_base_us = { + let mut shared_capture_base_us = shared_source_capture_base_slot() + .lock() + .expect("shared source capture base mutex poisoned"); + *shared_capture_base_us.get_or_insert(capture_now_us) + }; + state.capture_base_us = Some(capture_base_us); packet_pts_us = capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us)); used_source_pts = true; @@ -242,10 +253,19 @@ mod tests { DurationPacedSourcePtsRebaser, SourcePtsRebaser, capture_pts_us, packet_age, upstream_source_lag_cap, upstream_timing_trace_enabled, }; + use serial_test::serial; use std::time::Duration; + fn reset_shared_source_capture_base_for_tests() { + *super::shared_source_capture_base_slot() + .lock() + .expect("shared source capture base mutex poisoned") = None; + } + #[test] + #[serial] fn capture_pts_us_monotonically_advances() { + reset_shared_source_capture_base_for_tests(); let first = capture_pts_us(); std::thread::sleep(Duration::from_millis(2)); let second = capture_pts_us(); @@ -253,7 +273,9 @@ mod tests { } #[test] + #[serial] fn packet_age_is_small_for_recent_packets() { + reset_shared_source_capture_base_for_tests(); let pts = capture_pts_us(); std::thread::sleep(Duration::from_millis(2)); let age = packet_age(pts); @@ -262,7 +284,9 @@ mod tests { } #[test] + #[serial] fn source_pts_rebaser_preserves_source_delta_on_shared_capture_clock() { + reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let first = rebased.rebase_or_now(Some(1_000_000), 1); let second = rebased.rebase_or_now(Some(1_033_333), 1); @@ -278,7 +302,9 @@ mod tests { } #[test] + #[serial] fn source_pts_rebaser_stays_monotonic_when_source_pts_repeat() { + reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let first = rebased.rebase_or_now(Some(50_000), 1); let second = rebased.rebase_or_now(Some(50_000), 1); @@ -287,7 +313,9 @@ mod tests { } #[test] + #[serial] fn source_pts_rebaser_falls_back_to_capture_clock_without_source_pts() { + reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let first = rebased.rebase_or_now(None, 1); std::thread::sleep(Duration::from_millis(2)); @@ -301,7 +329,9 @@ mod tests { } #[test] + #[serial] fn source_pts_rebaser_clamps_source_lag_when_it_falls_too_far_behind_now() { + reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None); std::thread::sleep(Duration::from_millis(8)); @@ -315,7 +345,26 @@ mod tests { } #[test] + #[serial] + fn source_pts_rebasers_share_one_capture_base_across_streams() { + reset_shared_source_capture_base_for_tests(); + let microphone = SourcePtsRebaser::default(); + let camera = SourcePtsRebaser::default(); + + let first_microphone = microphone.rebase_or_now(Some(80_000), 1); + std::thread::sleep(Duration::from_millis(5)); + let first_camera = camera.rebase_or_now(Some(435_000), 1); + + assert_eq!(first_microphone.capture_base_us, first_camera.capture_base_us); + assert_eq!(first_microphone.packet_pts_us, first_camera.packet_pts_us); + assert_eq!(first_microphone.source_base_us, Some(80_000)); + assert_eq!(first_camera.source_base_us, Some(435_000)); + } + + #[test] + #[serial] fn upstream_timing_trace_flag_defaults_off_and_accepts_true_values() { + reset_shared_source_capture_base_for_tests(); temp_env::with_var_unset("LESAVKA_UPSTREAM_TIMING_TRACE", || { assert!(!upstream_timing_trace_enabled()); }); @@ -330,7 +379,9 @@ mod tests { } #[test] + #[serial] fn upstream_source_lag_cap_defaults_and_accepts_override() { + reset_shared_source_capture_base_for_tests(); temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", || { assert_eq!(upstream_source_lag_cap(), Duration::from_millis(250)); }); @@ -341,7 +392,9 @@ mod tests { } #[test] + #[serial] fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() { + reset_shared_source_capture_base_for_tests(); let rebased = DurationPacedSourcePtsRebaser::default(); let first = rebased.rebase_with_packet_duration(Some(0), 21_333, Duration::from_millis(250)); @@ -355,7 +408,9 @@ mod tests { } #[test] + #[serial] fn duration_paced_rebaser_clamps_when_duration_pacing_falls_stale() { + reset_shared_source_capture_base_for_tests(); let rebased = DurationPacedSourcePtsRebaser::default(); let _first = rebased.rebase_with_packet_duration(Some(0), 10_000, Duration::from_millis(2)); std::thread::sleep(Duration::from_millis(8)); diff --git a/client/src/sync_probe/capture.rs b/client/src/sync_probe/capture.rs index 98c2428..3a0810c 100644 --- a/client/src/sync_probe/capture.rs +++ b/client/src/sync_probe/capture.rs @@ -43,8 +43,8 @@ pub use runtime::SyncProbeCapture; #[cfg(any(not(coverage), test))] const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig { - capacity: 8, - max_age: Duration::from_millis(350), + capacity: 32, + max_age: Duration::from_secs(1), }; #[cfg(any(not(coverage), test))] diff --git a/client/src/sync_probe/runner.rs b/client/src/sync_probe/runner.rs index 05b426a..71fc5b1 100644 --- a/client/src/sync_probe/runner.rs +++ b/client/src/sync_probe/runner.rs @@ -48,7 +48,6 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> { config.pulse_width, config.marker_tick_period, ); - let capture = SyncProbeCapture::new(camera, schedule, config.duration)?; tracing::info!( server = %config.server, @@ -62,6 +61,7 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> { let video_channel = connect(config.server.as_str()).await?; let audio_channel = connect(config.server.as_str()).await?; + let capture = SyncProbeCapture::new(camera, schedule, config.duration)?; let video_queue = capture.video_queue(); let audio_queue = capture.audio_queue(); diff --git a/common/Cargo.toml b/common/Cargo.toml index b09e643..454a0e2 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.16" +version = "0.14.17" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 8e3d63e..bb3cf84 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.16" +version = "0.14.17" edition = "2024" autobins = false