diff --git a/AGENTS.md b/AGENTS.md index d2d115f..b3528d0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,5 +1,74 @@ # Lesavka Agent Notes +## 0.17.38 Bundled Webcam A/V Migration Checklist + +Context: manual Google Meet and mirrored-probe testing showed the split webcam +and microphone uplink design is too fragile under real browser/device pressure. +The new product contract is: when webcam video is present, microphone audio +travels with it on one client-owned upstream media stream. The server manages +freshness and smoothness after arrival; it no longer tries to make two racing +upstream channels look synchronized. Microphone-only remains supported as the +explicit no-camera path. + +### Product Invariants +- [x] Webcam-enabled sessions use one bundled upstream media RPC by default. +- [x] Webcam-enabled sessions imply microphone capture when the server supports UAC. +- [x] The UI-selected camera, camera quality, microphone, speaker, gain, and + enable switches remain authoritative; defaults may not override visible UI state. +- [x] Client capture timestamps are the source of A/V sync truth for webcam sessions. +- [x] Server bundled playout rebases that client timeline onto a fresh local epoch. +- [x] Server bundled playout may drop stale packets, but must not rebuild sync by + independently pairing separate camera and microphone streams. +- [x] Mic-only sessions keep the existing microphone stream path. +- [x] Legacy split webcam/mic uplink is only an explicit compatibility escape hatch. +- [ ] Manual probes and diagnostics clearly label `bundled-webcam-media` versus + `mic-only` so we never confuse the architectures during debugging. + +### Wire Protocol +- [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or + more audio packets from the same client capture timeline. +- [x] Add `StreamWebcamMedia(stream UpstreamMediaBundle)` to the relay service. +- [x] Advertise bundled support in the handshake capability set. +- [ ] Add compatibility tests proving older split RPCs are not used by normal + webcam sessions when bundled support is advertised. + +### Client Migration +- [x] Change session startup to prefer bundled webcam media whenever camera and + microphone are both available. +- [x] Spawn one bundled capture/uplink task instead of separate camera and mic + tasks for webcam sessions. +- [x] Bundle camera frames and microphone packets into one freshness-bounded queue. +- [x] Stamp all packets at capture/uplink enqueue before the async gRPC stream + can add misleading delay. +- [x] Preserve live UI device/profile changes by restarting the bundled capture + pipeline when selected camera, camera quality, or microphone changes. +- [ ] Make launcher diagnostics expose the active upstream mode as first-class + text rather than inferring from separate camera/mic telemetry. +- [ ] Migrate sync-probe runner to the bundled path explicitly and remove any + normal probe dependence on split `StreamCamera` + `StreamMicrophone`. + +### Server Migration +- [x] Implement `StreamWebcamMedia` and make it own both UAC and UVC/HDMI sinks + for one upstream session. +- [x] Schedule bundled packets by shared client capture timestamp instead of + startup-pairing independent streams. +- [x] Keep server freshness drops/reanchors active for bundled media. +- [x] Continue reporting client timing and sink handoff diagnostics from bundled packets. +- [ ] Add bundled-mode counters for first bundle, first audio push, first video feed, + dropped stale bundles, and bundle queue age. +- [ ] Retire split-stream planner assumptions from the webcam path after the + bundled mode passes manual Google Meet and mirrored-probe validation. + +### Validation +- [x] `cargo check -p lesavka_common -p lesavka_client -p lesavka_server --bins` +- [x] Focused handshake and launcher tests. +- [ ] Focused server upstream-media tests including bundled stream acceptance. +- [ ] Install on both ends and verify diagnostics show bundled webcam media. +- [ ] Manual Google Meet test: camera starts, video is not black/unsupported, + audio is intelligible, and lip sync is inside the acceptable band. +- [ ] Mirrored browser probe: recordings open in Dolphin and automated analyzer + no longer depends on fragile split-channel assumptions. + ## A/V Sync Probe And Lip-Sync Validation Checklist Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind video even though internal client/server telemetry reported fresh uplink packets. Treat this as a product correctness failure, not a calibration issue. Do not resume blind lip-sync tuning until the probe can explain where delay appears. diff --git a/Cargo.lock b/Cargo.lock index 6ccc464..09ae1ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.37" +version = "0.17.38" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.37" +version = "0.17.38" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.37" +version = "0.17.38" dependencies = [ "anyhow", "base64", @@ -1731,6 +1731,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", + "base64", "chacha20poly1305", "chrono", "evdev", diff --git a/client/Cargo.toml b/client/Cargo.toml index 6400ebd..0a66ad5 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.37" +version = "0.17.38" edition = "2024" [dependencies] diff --git a/client/src/app.rs b/client/src/app.rs index 8dc250b..7978032 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -19,8 +19,8 @@ use winit::{ }; use lesavka_common::lesavka::{ - AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, - relay_client::RelayClient, + AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, UpstreamMediaBundle, + VideoPacket, relay_client::RelayClient, }; #[cfg(not(coverage))] diff --git a/client/src/app/session_lifecycle.rs b/client/src/app/session_lifecycle.rs index 422043b..1fa963b 100644 --- a/client/src/app/session_lifecycle.rs +++ b/client/src/app/session_lifecycle.rs @@ -62,10 +62,14 @@ impl LesavkaClientApp { let initial_cam_profile = initial_camera_profile_id_from_env(); let initial_mic_source = std::env::var("LESAVKA_MIC_SOURCE").ok(); let initial_audio_sink = std::env::var("LESAVKA_AUDIO_SINK").ok(); + let camera_enabled = caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(); + let microphone_available = caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(); + let bundled_webcam_media = + camera_enabled && microphone_available && caps.bundled_webcam_media; let media_controls = crate::live_media_control::LiveMediaControls::from_env( crate::live_media_control::MediaControlState::with_devices( - caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(), - caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(), + camera_enabled, + microphone_available || bundled_webcam_media, std::env::var("LESAVKA_AUDIO_DISABLE").is_err(), initial_cam_source.clone(), initial_cam_profile.clone(), @@ -229,31 +233,62 @@ impl LesavkaClientApp { } else { info!("๐Ÿงช headless mode: skipping video/audio renderers"); } - /*โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ camera & mic tasks (gated by caps) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€*/ - if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { + /*โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ upstream webcam/mic tasks (gated by caps) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€*/ + if bundled_webcam_media { if let Some(cfg) = camera_cfg { info!( codec = ?cfg.codec, width = cfg.width, height = cfg.height, fps = cfg.fps, - "๐Ÿ“ธ using negotiated server UVC caps for emitted format; launcher quality still controls local capture" + "๐Ÿ“ฆ using bundled webcam A/V uplink; client capture owns sync, server owns freshness" ); } let ep = vid_ep.clone(); let cam_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera); + let mic_telemetry = + uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); let media_controls = media_controls.clone(); - tokio::spawn(Self::cam_loop( + tokio::spawn(Self::webcam_media_loop( ep, initial_cam_source.clone(), initial_cam_profile.clone(), + initial_mic_source.clone(), camera_cfg, cam_telemetry, + mic_telemetry, media_controls, )); + } else if camera_enabled { + warn!( + "๐Ÿ“ฆ server did not advertise bundled webcam A/V; split camera uplink is disabled unless LESAVKA_LEGACY_SPLIT_UPLINK=1" + ); + if std::env::var("LESAVKA_LEGACY_SPLIT_UPLINK").is_ok() { + if let Some(cfg) = camera_cfg { + info!( + codec = ?cfg.codec, + width = cfg.width, + height = cfg.height, + fps = cfg.fps, + "๐Ÿ“ธ using legacy split camera uplink by explicit override" + ); + } + let ep = vid_ep.clone(); + let cam_telemetry = + uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera); + let media_controls = media_controls.clone(); + tokio::spawn(Self::cam_loop( + ep, + initial_cam_source.clone(), + initial_cam_profile.clone(), + camera_cfg, + cam_telemetry, + media_controls, + )); + } } - if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { + if microphone_available && !bundled_webcam_media { let ep = vid_ep.clone(); let mic_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index db632ef..9f4fac0 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -1,4 +1,288 @@ impl LesavkaClientApp { + /*โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ bundled webcam + mic stream โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€*/ + #[cfg(not(coverage))] + async fn webcam_media_loop( + ep: Channel, + initial_camera_source: Option, + initial_camera_profile: Option, + initial_microphone_source: Option, + camera_cfg: Option, + camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + media_controls: crate::live_media_control::LiveMediaControls, + ) { + let mut delay = Duration::from_secs(1); + static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); + + loop { + let state = media_controls.refresh(); + let camera_requested = state.camera; + let microphone_requested = state.microphone || state.camera; + if !camera_requested && !microphone_requested { + camera_telemetry.record_enabled(false); + microphone_telemetry.record_enabled(false); + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + + let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref()); + let active_camera_profile = + state.camera_profile.resolve(initial_camera_profile.as_deref()); + let active_microphone_source = state + .microphone_source + .resolve(initial_microphone_source.as_deref()); + let capture_profile = active_camera_profile + .as_deref() + .and_then(parse_camera_profile_id); + let use_default_microphone = matches!( + state.microphone_source, + crate::live_media_control::MediaDeviceChoice::Auto + ) && active_microphone_source.is_none(); + let setup_camera_source = active_camera_source.clone(); + let setup_microphone_source = active_microphone_source.clone(); + + let setup = tokio::task::spawn_blocking(move || { + let microphone = if use_default_microphone { + MicrophoneCapture::new_default_source() + } else { + MicrophoneCapture::new_with_source(setup_microphone_source.as_deref()) + }?; + let camera = if camera_requested { + Some(CameraCapture::new_with_capture_profile( + setup_camera_source.as_deref(), + camera_cfg, + capture_profile, + )?) + } else { + None + }; + Ok::<_, anyhow::Error>((camera.map(Arc::new), Arc::new(microphone))) + }) + .await; + + let (camera, microphone) = match setup { + Ok(Ok(captures)) => captures, + Ok(Err(err)) => { + camera_telemetry.record_disconnect(format!( + "bundled webcam media setup failed: {err:#}" + )); + microphone_telemetry.record_disconnect(format!( + "bundled webcam media setup failed: {err:#}" + )); + warn!( + "๐Ÿ“ฆ bundled webcam media setup failed for camera={:?} mic={:?}: {err:#}", + active_camera_source.as_deref().unwrap_or("auto"), + active_microphone_source.as_deref().unwrap_or("auto") + ); + if camera_requested { + abort_if_required_media_source_failed( + "camera", + "๐Ÿ“ธ", + active_camera_source.as_deref(), + &err, + ); + } + abort_if_required_media_source_failed( + "microphone", + "๐ŸŽค", + active_microphone_source.as_deref(), + &err, + ); + delay = app_support::next_delay(delay); + tokio::time::sleep(delay).await; + continue; + } + Err(err) => { + camera_telemetry.record_disconnect(format!( + "bundled webcam media setup task failed: {err}" + )); + microphone_telemetry.record_disconnect(format!( + "bundled webcam media setup task failed: {err}" + )); + warn!("๐Ÿ“ฆ bundled webcam media setup task failed: {err}"); + delay = app_support::next_delay(delay); + tokio::time::sleep(delay).await; + continue; + } + }; + + camera_telemetry.record_reconnect_attempt(); + microphone_telemetry.record_reconnect_attempt(); + let mut cli = RelayClient::new(ep.clone()); + let queue: crate::uplink_fresh_queue::FreshPacketQueue = + crate::uplink_fresh_queue::FreshPacketQueue::new(BUNDLED_MEDIA_UPLINK_QUEUE); + let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new( + "bundled-webcam-media", + "๐Ÿ“ฆ", + ))); + + let queue_stream = queue.clone(); + let camera_telemetry_stream = camera_telemetry.clone(); + let microphone_telemetry_stream = microphone_telemetry.clone(); + let drop_log_stream = Arc::clone(&drop_log); + let outbound = async_stream::stream! { + loop { + let next = queue_stream.pop_fresh().await; + if next.dropped_stale > 0 { + camera_telemetry_stream.record_stale_drop(next.dropped_stale); + microphone_telemetry_stream.record_stale_drop(next.dropped_stale); + log_uplink_drop( + &drop_log_stream, + UplinkDropReason::Stale, + next.dropped_stale, + next.queue_depth, + duration_ms(next.delivery_age), + ); + } + if let Some(mut bundle) = next.packet { + let queue_depth = queue_depth_u32(next.queue_depth); + let delivery_age_ms = duration_ms(next.delivery_age); + if bundle.video.is_some() { + camera_telemetry_stream.record_streamed( + queue_depth, + delivery_age_ms, + ); + } + if !bundle.audio.is_empty() { + microphone_telemetry_stream.record_streamed( + queue_depth, + delivery_age_ms, + ); + } + attach_bundle_queue_metadata(&mut bundle, next.queue_depth, next.delivery_age); + yield bundle; + continue; + } + break; + } + }; + + match cli.stream_webcam_media(Request::new(outbound)).await { + Ok(mut resp) => { + let stop = Arc::new(AtomicBool::new(false)); + let (event_tx, event_rx) = std::sync::mpsc::channel::(); + let camera_worker = camera.as_ref().map(|camera| { + let camera = Arc::clone(camera); + let stop = Arc::clone(&stop); + let event_tx = event_tx.clone(); + let media_controls = media_controls.clone(); + let initial_camera_source = initial_camera_source.clone(); + let initial_camera_profile = initial_camera_profile.clone(); + let active_camera_source = active_camera_source.clone(); + let active_camera_profile = active_camera_profile.clone(); + std::thread::spawn(move || { + while !stop.load(Ordering::Relaxed) { + let state = media_controls.refresh(); + let desired_source = + state.camera_source.resolve(initial_camera_source.as_deref()); + let desired_profile = + state.camera_profile.resolve(initial_camera_profile.as_deref()); + if !state.camera + || desired_source != active_camera_source + || desired_profile != active_camera_profile + { + let _ = event_tx.send(BundledCaptureEvent::Restart); + break; + } + if let Some(mut pkt) = camera.pull() { + let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt); + if event_tx.send(BundledCaptureEvent::Video(pkt)).is_err() { + break; + } + } + } + }) + }); + let microphone_worker = { + let microphone = Arc::clone(µphone); + let stop = Arc::clone(&stop); + let event_tx = event_tx.clone(); + let media_controls = media_controls.clone(); + let initial_microphone_source = initial_microphone_source.clone(); + let active_microphone_source = active_microphone_source.clone(); + let active_camera_requested = camera_requested; + std::thread::spawn(move || { + while !stop.load(Ordering::Relaxed) { + let state = media_controls.refresh(); + let desired_source = state + .microphone_source + .resolve(initial_microphone_source.as_deref()); + if state.camera != active_camera_requested + || !(state.microphone || state.camera) + || desired_source != active_microphone_source + { + let _ = event_tx.send(BundledCaptureEvent::Restart); + break; + } + if let Some(mut pkt) = microphone.pull() { + let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt); + if event_tx.send(BundledCaptureEvent::Audio(pkt)).is_err() { + break; + } + } + } + }) + }; + drop(event_tx); + + let bundle_worker = { + let stop = Arc::clone(&stop); + let queue = queue.clone(); + let camera_telemetry = camera_telemetry.clone(); + let microphone_telemetry = microphone_telemetry.clone(); + let drop_log = Arc::clone(&drop_log); + std::thread::spawn(move || { + bundle_captured_media( + event_rx, + stop, + queue, + camera_telemetry, + microphone_telemetry, + drop_log, + ); + }) + }; + + delay = Duration::from_secs(1); + camera_telemetry.record_connected(); + microphone_telemetry.record_connected(); + while resp.get_mut().message().await.transpose().is_some() {} + camera_telemetry.record_disconnect("bundled webcam media stream ended"); + microphone_telemetry.record_disconnect("bundled webcam media stream ended"); + stop.store(true, Ordering::Relaxed); + queue.close(); + if let Some(worker) = camera_worker { + let _ = worker.join(); + } + let _ = microphone_worker.join(); + let _ = bundle_worker.join(); + } + Err(e) if e.code() == tonic::Code::Unimplemented => { + camera_telemetry.record_disconnect("bundled webcam media unavailable on server"); + microphone_telemetry + .record_disconnect("bundled webcam media unavailable on server"); + warn!("๐Ÿ“ฆ server does not support bundled webcam media โ€“ retrying"); + delay = app_support::next_delay(delay); + } + Err(e) => { + camera_telemetry + .record_disconnect(format!("bundled webcam media connect failed: {e}")); + microphone_telemetry + .record_disconnect(format!("bundled webcam media connect failed: {e}")); + if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { + warn!("โŒ๐Ÿ“ฆ bundled webcam media connect failed: {e}"); + } else { + debug!("โŒ๐Ÿ“ฆ bundled webcam media reconnect failed: {e}"); + } + delay = app_support::next_delay(delay); + } + } + + queue.close(); + tokio::time::sleep(delay).await; + } + } + /*โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ mic stream โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€*/ #[cfg(not(coverage))] async fn voice_loop( @@ -485,6 +769,200 @@ const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest, }; +#[cfg(not(coverage))] +const BUNDLED_MEDIA_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = + crate::uplink_fresh_queue::FreshQueueConfig { + capacity: 16, + max_age: Duration::from_millis(350), + policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, + }; + +#[cfg(not(coverage))] +const BUNDLED_AUDIO_FLUSH_INTERVAL: Duration = Duration::from_millis(20); + +#[cfg(not(coverage))] +const BUNDLED_AUDIO_MAX_PENDING: usize = 8; + +#[cfg(not(coverage))] +#[derive(Debug)] +enum BundledCaptureEvent { + Audio(AudioPacket), + Video(VideoPacket), + Restart, +} + +#[cfg(not(coverage))] +fn bundle_captured_media( + event_rx: std::sync::mpsc::Receiver, + stop: Arc, + queue: crate::uplink_fresh_queue::FreshPacketQueue, + camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + drop_log: Arc>, +) { + static BUNDLED_SESSION: AtomicU64 = AtomicU64::new(0); + let session_id = BUNDLED_SESSION + .fetch_add(1, Ordering::Relaxed) + .saturating_add(1); + let mut bundle_seq = 0_u64; + let mut pending_audio = Vec::new(); + let mut next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; + + loop { + if stop.load(Ordering::Relaxed) { + break; + } + let timeout = next_audio_flush.saturating_duration_since(Instant::now()); + match event_rx.recv_timeout(timeout) { + Ok(BundledCaptureEvent::Audio(packet)) => { + pending_audio.push(packet); + if pending_audio.len() >= BUNDLED_AUDIO_MAX_PENDING { + emit_bundled_media( + session_id, + &mut bundle_seq, + None, + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; + } + } + Ok(BundledCaptureEvent::Video(packet)) => { + emit_bundled_media( + session_id, + &mut bundle_seq, + Some(packet), + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; + } + Ok(BundledCaptureEvent::Restart) => { + stop.store(true, Ordering::Relaxed); + break; + } + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + if !pending_audio.is_empty() { + emit_bundled_media( + session_id, + &mut bundle_seq, + None, + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + } + next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, + } + } + queue.close(); +} + +#[cfg(not(coverage))] +fn emit_bundled_media( + session_id: u64, + bundle_seq: &mut u64, + video: Option, + audio: Vec, + queue: &crate::uplink_fresh_queue::FreshPacketQueue, + camera_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle, + microphone_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle, + drop_log: &Arc>, +) { + if video.is_none() && audio.is_empty() { + return; + } + *bundle_seq = bundle_seq.saturating_add(1); + let (capture_start_us, capture_end_us) = bundled_capture_bounds(video.as_ref(), &audio); + let enqueue_now_us = crate::live_capture_clock::capture_pts_us(); + let enqueue_age = Duration::from_micros(enqueue_now_us.saturating_sub(capture_start_us)); + let has_video = video.is_some(); + let has_audio = !audio.is_empty(); + let mut bundle = UpstreamMediaBundle { + session_id, + seq: *bundle_seq, + capture_start_us, + capture_end_us, + video, + audio, + ..UpstreamMediaBundle::default() + }; + attach_bundle_queue_metadata(&mut bundle, 0, enqueue_age); + let stats = queue.push(bundle, enqueue_age); + if stats.dropped_queue_full > 0 { + if has_video { + camera_telemetry.record_queue_full_drop(stats.dropped_queue_full); + } + if has_audio { + microphone_telemetry.record_queue_full_drop(stats.dropped_queue_full); + } + log_uplink_drop( + drop_log, + UplinkDropReason::QueueFull, + stats.dropped_queue_full, + stats.queue_depth, + duration_ms(enqueue_age), + ); + } + let queue_depth = queue_depth_u32(stats.queue_depth); + let age_ms = duration_ms(enqueue_age); + if has_video { + camera_telemetry.record_enqueue(queue_depth, age_ms, 0.0); + } + if has_audio { + microphone_telemetry.record_enqueue(queue_depth, age_ms, 0.0); + } +} + +#[cfg(not(coverage))] +fn bundled_capture_bounds(video: Option<&VideoPacket>, audio: &[AudioPacket]) -> (u64, u64) { + let mut start = u64::MAX; + let mut end = 0_u64; + if let Some(video) = video { + let pts = packet_video_capture_pts_us(video); + start = start.min(pts); + end = end.max(pts); + } + for packet in audio { + let pts = packet_audio_capture_pts_us(packet); + start = start.min(pts); + end = end.max(pts); + } + if start == u64::MAX { + let now = crate::live_capture_clock::capture_pts_us(); + return (now, now); + } + (start, end.max(start)) +} + +#[cfg(not(coverage))] +fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 { + if packet.client_capture_pts_us == 0 { + packet.pts + } else { + packet.client_capture_pts_us + } +} + +#[cfg(not(coverage))] +fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 { + if packet.client_capture_pts_us == 0 { + packet.pts + } else { + packet.client_capture_pts_us + } +} + #[cfg(not(coverage))] fn queue_depth_u32(depth: usize) -> u32 { depth.try_into().unwrap_or(u32::MAX) @@ -553,6 +1031,20 @@ fn attach_video_queue_metadata( packet.client_queue_age_ms = duration_ms_u32(delivery_age); } +#[cfg(not(coverage))] +fn attach_bundle_queue_metadata( + bundle: &mut UpstreamMediaBundle, + queue_depth: usize, + delivery_age: Duration, +) { + for packet in &mut bundle.audio { + attach_audio_queue_metadata(packet, queue_depth, delivery_age); + } + if let Some(packet) = bundle.video.as_mut() { + attach_video_queue_metadata(packet, queue_depth, delivery_age); + } +} + #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] enum UplinkDropReason { diff --git a/client/src/app_support.rs b/client/src/app_support.rs index 9c96dd3..81b8300 100644 --- a/client/src/app_support.rs +++ b/client/src/app_support.rs @@ -97,6 +97,7 @@ mod tests { let mut caps = PeerCaps { camera: true, microphone: false, + bundled_webcam_media: false, server_version: None, camera_output: Some(String::from("uvc")), camera_codec: Some(String::from("mjpeg")), diff --git a/client/src/bin/lesavka-relayctl.rs b/client/src/bin/lesavka-relayctl.rs index f473c99..298a2e1 100644 --- a/client/src/bin/lesavka-relayctl.rs +++ b/client/src/bin/lesavka-relayctl.rs @@ -247,6 +247,7 @@ fn print_versions(server_addr: &str, caps: &HandshakeSet) { println!("server_revision={server_revision}"); println!("server_camera_output={}", caps.camera_output); println!("server_camera_codec={}", caps.camera_codec); + println!("server_bundled_webcam_media={}", caps.bundled_webcam_media); } fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) { diff --git a/client/src/handshake.rs b/client/src/handshake.rs index a0b521b..12cf3cd 100644 --- a/client/src/handshake.rs +++ b/client/src/handshake.rs @@ -12,6 +12,7 @@ use tracing::{info, warn}; pub struct PeerCaps { pub camera: bool, pub microphone: bool, + pub bundled_webcam_media: bool, pub server_version: Option, pub camera_output: Option, pub camera_codec: Option, @@ -73,6 +74,7 @@ pub async fn negotiate(uri: &str) -> PeerCaps { PeerCaps { camera: rsp.camera, microphone: rsp.microphone, + bundled_webcam_media: rsp.bundled_webcam_media, server_version: (!rsp.server_version.is_empty()) .then_some(rsp.server_version.clone()), camera_output: (!rsp.camera_output.is_empty()).then_some(rsp.camera_output.clone()), @@ -119,6 +121,7 @@ pub async fn probe(uri: &str) -> HandshakeProbe { caps: PeerCaps { camera: rsp.camera, microphone: rsp.microphone, + bundled_webcam_media: rsp.bundled_webcam_media, server_version: (!rsp.server_version.is_empty()) .then_some(rsp.server_version.clone()), camera_output: (!rsp.camera_output.is_empty()) @@ -196,6 +199,7 @@ pub async fn negotiate(uri: &str) -> PeerCaps { let caps = PeerCaps { camera: rsp.camera, microphone: rsp.microphone, + bundled_webcam_media: rsp.bundled_webcam_media, server_version: if rsp.server_version.is_empty() { None } else { @@ -301,6 +305,7 @@ pub async fn probe(uri: &str) -> HandshakeProbe { let caps = PeerCaps { camera: rsp.camera, microphone: rsp.microphone, + bundled_webcam_media: rsp.bundled_webcam_media, server_version: (!rsp.server_version.is_empty()) .then_some(rsp.server_version.clone()), camera_output: (!rsp.camera_output.is_empty()) diff --git a/client/src/launcher/tests/preview.rs b/client/src/launcher/tests/preview.rs index a63e333..b184bc9 100644 --- a/client/src/launcher/tests/preview.rs +++ b/client/src/launcher/tests/preview.rs @@ -45,6 +45,8 @@ impl Relay for ProbeRelay { Pin> + Send>>; type StreamCameraStream = Pin> + Send>>; + type StreamWebcamMediaStream = + Pin> + Send>>; async fn stream_keyboard( &self, @@ -90,6 +92,13 @@ impl Relay for ProbeRelay { Ok(Response::new(Box::pin(stream::empty()))) } + async fn stream_webcam_media( + &self, + _request: Request>, + ) -> Result, Status> { + Ok(Response::new(Box::pin(stream::empty()))) + } + async fn paste_text( &self, _request: Request, diff --git a/client/src/launcher/tests/utility_actions.rs b/client/src/launcher/tests/utility_actions.rs index 423e585..7be6420 100644 --- a/client/src/launcher/tests/utility_actions.rs +++ b/client/src/launcher/tests/utility_actions.rs @@ -6,7 +6,7 @@ use futures::stream; use lesavka_common::lesavka::{ AudioPacket, CalibrationRequest, CalibrationState, CapturePowerState, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, - UpstreamSyncState, VideoPacket, + UpstreamMediaBundle, UpstreamSyncState, VideoPacket, relay_server::{Relay, RelayServer}, }; use serial_test::serial; @@ -48,6 +48,7 @@ impl Relay for UtilityRelay { type CaptureAudioStream = AudioStream; type StreamMicrophoneStream = EmptyStream; type StreamCameraStream = EmptyStream; + type StreamWebcamMediaStream = EmptyStream; async fn stream_keyboard( &self, @@ -91,6 +92,13 @@ impl Relay for UtilityRelay { Ok(Response::new(Box::pin(stream::empty()))) } + async fn stream_webcam_media( + &self, + _request: Request>, + ) -> Result, Status> { + Ok(Response::new(Box::pin(stream::empty()))) + } + async fn paste_text( &self, _request: Request, diff --git a/common/Cargo.toml b/common/Cargo.toml index 8bd7bc8..ce21c74 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.37" +version = "0.17.38" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 107121c..47367d9 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -46,6 +46,23 @@ message AudioPacket { uint32 client_queue_age_ms = 8; } +message UpstreamMediaBundle { + // One client-owned webcam/microphone capture batch. When a webcam is active, + // this is the authoritative upstream transport so audio and video share one + // ordered gRPC stream instead of racing through independent channels. + uint64 session_id = 1; + uint64 seq = 2; + uint64 capture_start_us = 3; + uint64 capture_end_us = 4; + VideoPacket video = 5; + repeated AudioPacket audio = 6; + uint32 audio_sample_rate = 7; + uint32 audio_channels = 8; + uint32 video_width = 9; + uint32 video_height = 10; + uint32 video_fps = 11; +} + message ResetUsbReply { bool ok = 1; } // true = success message PasteRequest { @@ -157,6 +174,7 @@ message HandshakeSet { uint32 eye_fps = 10; string server_version = 11; string server_revision = 12; + bool bundled_webcam_media = 13; } message Empty {} @@ -168,6 +186,7 @@ service Relay { rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket); rpc StreamMicrophone (stream AudioPacket) returns (stream Empty); rpc StreamCamera (stream VideoPacket) returns (stream Empty); + rpc StreamWebcamMedia(stream UpstreamMediaBundle) returns (stream Empty); rpc PasteText (PasteRequest) returns (PasteReply); rpc RecoverUsb (Empty) returns (ResetUsbReply); diff --git a/server/Cargo.toml b/server/Cargo.toml index 558d588..673bb64 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.37" +version = "0.17.38" edition = "2024" autobins = false diff --git a/server/src/handshake.rs b/server/src/handshake.rs index ba569c6..09cea36 100644 --- a/server/src/handshake.rs +++ b/server/src/handshake.rs @@ -35,6 +35,7 @@ impl Handshake for HandshakeSvc { eye_fps, server_version: crate::VERSION.to_string(), server_revision: crate::REVISION.to_string(), + bundled_webcam_media: camera_enabled && microphone, })) } } diff --git a/server/src/main.rs b/server/src/main.rs index 7be9667..f144b99 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -20,7 +20,7 @@ use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::{ AudioPacket, CalibrationRequest, CalibrationState, CapturePowerCommand, CapturePowerState, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, - SetCapturePowerRequest, UpstreamSyncState, VideoPacket, + SetCapturePowerRequest, UpstreamMediaBundle, UpstreamSyncState, VideoPacket, relay_server::{Relay, RelayServer}, }; diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 937df4f..5e884f4 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -1,3 +1,67 @@ +#[cfg(not(coverage))] +#[derive(Debug)] +enum BundledUpstreamEvent { + Audio(AudioPacket), + Video(VideoPacket), +} + +#[cfg(not(coverage))] +impl BundledUpstreamEvent { + fn remote_pts_us(&self) -> u64 { + match self { + Self::Audio(packet) => packet.pts, + Self::Video(packet) => packet.pts, + } + } + + fn kind(&self) -> UpstreamMediaKind { + match self { + Self::Audio(_) => UpstreamMediaKind::Microphone, + Self::Video(_) => UpstreamMediaKind::Camera, + } + } +} + +#[cfg(not(coverage))] +#[derive(Clone, Copy, Debug, Default)] +struct BundledPlayoutClock { + base_remote_pts_us: Option, + epoch: Option, +} + +#[cfg(not(coverage))] +impl BundledPlayoutClock { + fn ensure( + &mut self, + bundle: &UpstreamMediaBundle, + events: &[BundledUpstreamEvent], + ) -> Option<(u64, tokio::time::Instant)> { + if self.base_remote_pts_us.is_none() || self.epoch.is_none() { + let base = if bundle.capture_start_us != 0 { + bundle.capture_start_us + } else { + events.iter().map(BundledUpstreamEvent::remote_pts_us).min()? + }; + self.base_remote_pts_us = Some(base); + self.epoch = Some(tokio::time::Instant::now() + bundled_upstream_playout_delay()); + } + Some(( + self.base_remote_pts_us.unwrap_or_default(), + self.epoch.expect("bundled epoch initialized"), + )) + } +} + +#[cfg(not(coverage))] +fn bundled_upstream_playout_delay() -> Duration { + std::env::var("LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS") + .or_else(|_| std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS")) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(350)) +} + #[cfg(not(coverage))] #[tonic::async_trait] impl Relay for Handler { @@ -7,6 +71,7 @@ impl Relay for Handler { type CaptureAudioStream = AudioStream; type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; + type StreamWebcamMediaStream = ReceiverStream>; async fn stream_keyboard( &self, @@ -102,6 +167,210 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + /// Accept client-bundled webcam and microphone packets on one upstream clock. + async fn stream_webcam_media( + &self, + req: Request>, + ) -> Result, Status> { + let rpc_id = runtime_support::next_stream_id(); + let camera_cfg = camera::current_camera_config(); + let microphone_lease = self.upstream_media_rt.activate_microphone(); + let camera_lease = self.upstream_media_rt.activate_camera(); + info!( + rpc_id, + session_id = camera_lease.session_id, + camera_generation = camera_lease.generation, + microphone_generation = microphone_lease.generation, + output = camera_cfg.output.as_str(), + codec = camera_cfg.codec.as_str(), + width = camera_cfg.width, + height = camera_cfg.height, + fps = camera_cfg.fps, + "๐Ÿ“ฆ stream_webcam_media opened" + ); + let Some(microphone_sink_permit) = self + .upstream_media_rt + .reserve_microphone_sink(microphone_lease.generation) + .await + else { + self.upstream_media_rt + .close_camera(camera_lease.generation); + self.upstream_media_rt + .close_microphone(microphone_lease.generation); + return Err(Status::aborted( + "bundled webcam media stream superseded before microphone sink became available", + )); + }; + let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); + let mut sink = runtime_support::open_voice_with_retry(&uac_dev) + .await + .map_err(|e| { + self.upstream_media_rt + .close_camera(camera_lease.generation); + self.upstream_media_rt + .close_microphone(microphone_lease.generation); + Status::internal(format!("{e:#}")) + })?; + let (camera_session_id, relay, _relay_reused) = + match self.camera_rt.activate(&camera_cfg).await { + Ok(active) => active, + Err(err) => { + sink.finish(); + self.upstream_media_rt + .close_camera(camera_lease.generation); + self.upstream_media_rt + .close_microphone(microphone_lease.generation); + return Err(err); + } + }; + let camera_rt = self.camera_rt.clone(); + let upstream_media_rt = self.upstream_media_rt.clone(); + let frame_step_us = (1_000_000u64 / u64::from(camera_cfg.fps.max(1))).max(1); + let stale_drop_budget = upstream_stale_drop_budget(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(async move { + let _microphone_sink_permit = microphone_sink_permit; + let mut inbound = req.into_inner(); + let mut clock = BundledPlayoutClock::default(); + let mut outcome = "aborted"; + 'bundled_loop: loop { + let bundle = match inbound.next().await { + Some(Ok(bundle)) => bundle, + Some(Err(err)) => { + warn!( + rpc_id, + session_id = camera_lease.session_id, + "๐Ÿ“ฆ stream_webcam_media inbound error before clean EOF: {err}" + ); + break; + } + None => { + outcome = "closed"; + break; + } + }; + if !camera_rt.is_active(camera_session_id) + || !upstream_media_rt.is_camera_active(camera_lease.generation) + || !upstream_media_rt.is_microphone_active(microphone_lease.generation) + { + outcome = "superseded"; + break; + } + let mut events = Vec::with_capacity(bundle.audio.len() + 1); + if let Some(video) = bundle.video.clone() { + upstream_media_rt + .record_client_timing(UpstreamMediaKind::Camera, video_client_timing(&video)); + events.push(BundledUpstreamEvent::Video(video)); + } + for audio in &bundle.audio { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Microphone, + audio_client_timing(audio), + ); + events.push(BundledUpstreamEvent::Audio(audio.clone())); + } + if events.is_empty() { + continue; + } + events.sort_by_key(BundledUpstreamEvent::remote_pts_us); + let Some((base_remote_pts_us, epoch)) = clock.ensure(&bundle, &events) else { + continue; + }; + for event in events { + let kind = event.kind(); + let min_step_us = match kind { + UpstreamMediaKind::Camera => frame_step_us, + UpstreamMediaKind::Microphone => 1, + }; + let plan = match upstream_media_rt.plan_bundled_pts( + kind, + event.remote_pts_us(), + min_step_us, + base_remote_pts_us, + epoch, + ) { + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => continue, + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => { + tracing::warn!( + rpc_id, + session_id = camera_lease.session_id, + ?kind, + reason, + "๐Ÿ“ฆ bundled upstream packet dropped by freshness planner" + ); + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => continue, + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => { + tracing::error!( + rpc_id, + session_id = camera_lease.session_id, + reason, + "๐Ÿ“ฆ bundled upstream startup failed" + ); + break 'bundled_loop; + } + }; + if plan.late_by > stale_drop_budget { + tracing::warn!( + rpc_id, + session_id = camera_lease.session_id, + ?kind, + late_by_ms = plan.late_by.as_millis(), + pts = plan.local_pts_us, + "๐Ÿ“ฆ bundled upstream packet dropped after missing freshness budget" + ); + continue; + } + tokio::time::sleep_until(plan.due_at).await; + let actual_late_by = tokio::time::Instant::now() + .checked_duration_since(plan.due_at) + .unwrap_or_default(); + if actual_late_by > stale_drop_budget { + tracing::warn!( + rpc_id, + session_id = camera_lease.session_id, + ?kind, + late_by_ms = actual_late_by.as_millis(), + pts = plan.local_pts_us, + "๐Ÿ“ฆ bundled upstream packet dropped after waking too late" + ); + continue; + } + match event { + BundledUpstreamEvent::Audio(mut packet) => { + packet.pts = plan.local_pts_us; + sink.push(&packet); + upstream_media_rt.mark_audio_presented(packet.pts, plan.due_at); + } + BundledUpstreamEvent::Video(mut packet) => { + packet.pts = plan.local_pts_us; + let presented_pts = packet.pts; + relay.feed(packet); + upstream_media_rt.mark_video_presented(presented_pts, plan.due_at); + } + } + } + } + sink.finish(); + upstream_media_rt.close_camera(camera_lease.generation); + upstream_media_rt.close_microphone(microphone_lease.generation); + info!( + rpc_id, + session_id = camera_lease.session_id, + camera_session_id, + outcome, + "๐Ÿ“ฆ stream_webcam_media lifecycle ended" + ); + tx.send(Ok(Empty {})).await.ok(); + Ok::<(), Status>(()) + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + /// Accept synthetic upstream microphone packets without ALSA hardware. async fn stream_microphone( &self, diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 38215c4..a2212f2 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -45,6 +45,7 @@ impl Relay for Handler { type CaptureAudioStream = AudioStream; type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; + type StreamWebcamMediaStream = ReceiverStream>; async fn stream_keyboard( &self, @@ -186,6 +187,40 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + async fn stream_webcam_media( + &self, + req: Request>, + ) -> Result, Status> { + let microphone_lease = self.upstream_media_rt.activate_microphone(); + let camera_lease = self.upstream_media_rt.activate_camera(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + let upstream_media_rt = self.upstream_media_rt.clone(); + + tokio::spawn(async move { + let mut inbound = req.into_inner(); + while let Some(bundle) = inbound.next().await.transpose()? { + if let Some(video) = bundle.video { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Camera, + video_client_timing(&video), + ); + } + for audio in bundle.audio { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Microphone, + audio_client_timing(&audio), + ); + } + } + upstream_media_rt.close_camera(camera_lease.generation); + upstream_media_rt.close_microphone(microphone_lease.generation); + let _ = tx.send(Ok(Empty {})).await; + Ok::<(), Status>(()) + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + async fn stream_camera( &self, req: Request>, diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 9b84d3b..b967bb5 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -292,6 +292,142 @@ impl UpstreamMediaRuntime { self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } + /// Schedule a packet from the bundled webcam/microphone transport. + /// + /// Inputs: the media kind, client capture timestamp, packet cadence floor, + /// and the client-owned bundle epoch chosen for this gRPC stream. + /// Outputs: the server playout decision for that packet. + /// Why: bundled webcam media has already been synchronized on the client, + /// so the server should not re-solve cross-stream startup pairing. It only + /// rebases the shared client clock onto a fresh local playout epoch. + #[must_use] + pub fn plan_bundled_pts( + &self, + kind: UpstreamMediaKind, + remote_pts_us: u64, + min_step_us: u64, + bundle_base_remote_pts_us: u64, + bundle_epoch: Instant, + ) -> UpstreamPlanDecision { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + let session_id = state.session_id; + match kind { + UpstreamMediaKind::Camera => { + state.camera_packet_count = state.camera_packet_count.saturating_add(1); + state + .first_camera_remote_pts_us + .get_or_insert(remote_pts_us); + state.camera_startup_ready = true; + } + UpstreamMediaKind::Microphone => { + state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); + state + .first_microphone_remote_pts_us + .get_or_insert(remote_pts_us); + } + } + update_latest_remote_pts(&mut state, kind, remote_pts_us); + if state.session_base_remote_pts_us.is_none() { + state.session_base_remote_pts_us = Some(bundle_base_remote_pts_us); + state.playout_epoch = Some(bundle_epoch); + state.pairing_anchor_deadline = Some(bundle_epoch); + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "client-bundled upstream media epoch established".to_string(); + self.pairing_state_notify.notify_waiters(); + info!( + session_id, + bundle_base_remote_pts_us, "client-bundled upstream media epoch established" + ); + } + + let session_base_remote_pts_us = state + .session_base_remote_pts_us + .unwrap_or(bundle_base_remote_pts_us); + if remote_pts_us < session_base_remote_pts_us { + return UpstreamPlanDecision::DropBeforeOverlap; + } + + let max_live_lag = upstream_max_live_lag(); + let source_lag = source_lag_for_kind(&state, kind, remote_pts_us); + if source_lag > max_live_lag { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = + "dropped stale bundled video beyond max live lag".to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = + "dropped stale bundled audio beyond max live lag".to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale("bundled packet exceeded max live lag"); + } + + let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); + let last_slot = match kind { + UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, + UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, + }; + if let Some(last_pts_us) = *last_slot + && local_pts_us <= last_pts_us + { + local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); + } + *last_slot = Some(local_pts_us); + + let sink_offset_us = self.playout_offset_us(kind); + let epoch = state.playout_epoch.unwrap_or(bundle_epoch); + let mut due_at = + apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); + let now = Instant::now(); + let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); + let playout_delay = upstream_playout_delay().min(max_live_lag); + let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); + if late_by > reanchor_threshold { + let desired_due_at = now + playout_delay; + let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); + let recovered_epoch = unoffset_due_at + .checked_sub(Duration::from_micros(local_pts_us)) + .unwrap_or(unoffset_due_at); + state.playout_epoch = Some(recovered_epoch); + state.pairing_anchor_deadline = Some(desired_due_at); + state.freshness_reanchors = state.freshness_reanchors.saturating_add(1); + state.phase = UpstreamSyncPhase::Healing; + state.last_reason = + "reanchored bundled upstream playhead to preserve freshness".to_string(); + due_at = apply_playout_offset( + recovered_epoch + Duration::from_micros(local_pts_us), + sink_offset_us, + ); + late_by = now.checked_duration_since(due_at).unwrap_or_default(); + info!( + session_id, + ?kind, + local_pts_us, + remote_pts_us, + recovery_buffer_ms = playout_delay.as_millis(), + "bundled upstream media playhead reanchored to preserve freshness" + ); + } + + if kind == UpstreamMediaKind::Microphone { + self.audio_progress_notify.notify_waiters(); + } + UpstreamPlanDecision::Play(PlannedUpstreamPacket { + local_pts_us, + due_at, + late_by, + source_lag, + }) + } + /// Hold video until the audio master has at least reached the same capture /// moment, or until the bounded sync grace is exhausted. pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index ef898ea..456a493 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -36,6 +36,46 @@ fn shared_playout_epoch_is_reused_across_audio_and_video() { ); } +#[test] +#[serial(upstream_media_runtime)] +fn bundled_media_uses_client_epoch_without_pairing_wait() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + let epoch = tokio::time::Instant::now(); + + let audio = play(runtime.plan_bundled_pts( + super::UpstreamMediaKind::Microphone, + 1_000_000, + 1, + 1_000_000, + epoch, + )); + let video = play(runtime.plan_bundled_pts( + super::UpstreamMediaKind::Camera, + 1_033_333, + 16_666, + 1_000_000, + epoch, + )); + + assert_eq!(audio.local_pts_us, 0); + assert_eq!(video.local_pts_us, 33_333); + assert_eq!( + video.due_at.saturating_duration_since(audio.due_at), + Duration::from_micros(33_333) + ); + let snapshot = runtime.snapshot(); + assert_eq!(snapshot.latest_microphone_remote_pts_us, Some(1_000_000)); + assert_eq!(snapshot.latest_camera_remote_pts_us, Some(1_033_333)); + assert_eq!( + snapshot.last_reason, + "client-bundled upstream media epoch established" + ); + }); +} + #[test] #[serial(upstream_media_runtime)] fn pairing_window_holds_one_sided_playout_by_default() { diff --git a/testing/Cargo.toml b/testing/Cargo.toml index 1a5deb6..57fdaa3 100644 --- a/testing/Cargo.toml +++ b/testing/Cargo.toml @@ -12,6 +12,7 @@ path = "src/lib.rs" [dev-dependencies] anyhow = "1.0" async-stream = "0.3" +base64 = "0.22" chrono = "0.4" evdev = "0.13" futures-util = "0.3" diff --git a/testing/tests/client_app_include_contract.rs b/testing/tests/client_app_include_contract.rs index f5ac4a8..c3f7d09 100644 --- a/testing/tests/client_app_include_contract.rs +++ b/testing/tests/client_app_include_contract.rs @@ -12,6 +12,7 @@ mod handshake { pub struct PeerCaps { pub camera: bool, pub microphone: bool, + pub bundled_webcam_media: bool, pub server_version: Option, } @@ -19,6 +20,7 @@ mod handshake { PeerCaps { camera: std::env::var("LESAVKA_TEST_CAP_CAMERA").is_ok(), microphone: std::env::var("LESAVKA_TEST_CAP_MIC").is_ok(), + bundled_webcam_media: std::env::var("LESAVKA_TEST_CAP_BUNDLED").is_ok(), server_version: None, } } @@ -93,7 +95,7 @@ mod relay_transport { mod input { pub mod camera { - use crate::app_support::CameraConfig; + pub use crate::app_support::CameraConfig; use lesavka_common::lesavka::VideoPacket; pub struct CameraCapture; @@ -103,6 +105,14 @@ mod input { Ok(Self) } + pub fn new_with_capture_profile( + source: Option<&str>, + cfg: Option, + _profile: Option<(u32, u32, u32)>, + ) -> anyhow::Result { + Self::new(source, cfg) + } + pub fn pull(&self) -> Option { None } @@ -119,6 +129,14 @@ mod input { Ok(Self) } + pub fn new_default_source() -> anyhow::Result { + Self::new() + } + + pub fn new_with_source(_source: Option<&str>) -> anyhow::Result { + Self::new() + } + pub fn pull(&self) -> Option { None } @@ -196,6 +214,14 @@ mod output { Ok(Self) } + pub fn new_default_sink() -> anyhow::Result { + Self::new() + } + + pub fn new_with_sink(_sink: Option<&str>) -> anyhow::Result { + Self::new() + } + pub fn push(&self, _pkt: AudioPacket) {} } } diff --git a/testing/tests/client_output_audio_include_contract.rs b/testing/tests/client_output_audio_include_contract.rs index 45c0019..5adb7fe 100644 --- a/testing/tests/client_output_audio_include_contract.rs +++ b/testing/tests/client_output_audio_include_contract.rs @@ -41,7 +41,7 @@ mod audio_include_contract { #[serial] fn pick_sink_element_prefers_operator_override() { with_var("LESAVKA_AUDIO_SINK", Some("fakesink sync=false"), || { - let sink = pick_sink_element().expect("override sink"); + let sink = pick_sink_element(None, true).expect("override sink"); assert_eq!(sink, "fakesink sync=false"); }); } @@ -53,7 +53,7 @@ mod audio_include_contract { "LESAVKA_AUDIO_SINK", Some("alsa_output.pci-0000_00_1f.3.analog-stereo"), || { - let sink = pick_sink_element().expect("device sink"); + let sink = pick_sink_element(None, true).expect("device sink"); assert_eq!( sink, "pulsesink device=\"alsa_output.pci-0000_00_1f.3.analog-stereo\" buffer-time=350000 latency-time=100000 sync=true" @@ -83,7 +83,7 @@ exit 0 "DEFAULT".to_string() )] ); - let sink = pick_sink_element().expect("pick sink"); + let sink = pick_sink_element(None, true).expect("pick sink"); assert_eq!( sink, "pulsesink device=\"alsa_output.usb-DAC_1234-00.analog-stereo\" buffer-time=350000 latency-time=100000 sync=true" @@ -99,7 +99,7 @@ exit 0 "LESAVKA_AUDIO_SINK", Some("bluez_output.80_C3_BA_76_26_AB.1"), || { - let sink = pick_sink_element().expect("bluetooth sink"); + let sink = pick_sink_element(None, true).expect("bluetooth sink"); assert_eq!( sink, "pulsesink device=\"bluez_output.80_C3_BA_76_26_AB.1\" buffer-time=750000 latency-time=250000 sync=true" @@ -125,7 +125,7 @@ exit 0 list_pw_sinks().is_empty(), "no default sink should be parsed" ); - let sink = pick_sink_element().expect("fallback sink"); + let sink = pick_sink_element(None, true).expect("fallback sink"); assert_eq!(sink, "autoaudiosink"); }); }); @@ -142,6 +142,7 @@ exit 0 id: 0, pts: 1_234, data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], + ..AudioPacket::default() }); drop(out); } @@ -180,6 +181,7 @@ exit 0 id: 0, pts: 42_666, data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], + ..AudioPacket::default() }, &timeline, ); @@ -356,6 +358,7 @@ exit 1 id: 0, pts: 1_500, data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], + ..AudioPacket::default() }, &timeline, ); diff --git a/testing/tests/client_runtime_smoke_contract.rs b/testing/tests/client_runtime_smoke_contract.rs index f72e966..bd81f6c 100644 --- a/testing/tests/client_runtime_smoke_contract.rs +++ b/testing/tests/client_runtime_smoke_contract.rs @@ -101,6 +101,7 @@ fn audio_out_constructor_and_push_are_stable() { id: 0, pts: 0, data: Vec::new(), + ..AudioPacket::default() }); } Err(err) => { diff --git a/testing/tests/handshake_camera_contract.rs b/testing/tests/handshake_camera_contract.rs index 74acf5b..fa6df48 100644 --- a/testing/tests/handshake_camera_contract.rs +++ b/testing/tests/handshake_camera_contract.rs @@ -142,6 +142,7 @@ impl Handshake for SparseHandshakeSvc { eye_height: 0, eye_fps: 0, server_revision: String::new(), + bundled_webcam_media: false, })) } } diff --git a/testing/tests/server_audio_include_contract.rs b/testing/tests/server_audio_include_contract.rs index b4a7a8f..8086c46 100644 --- a/testing/tests/server_audio_include_contract.rs +++ b/testing/tests/server_audio_include_contract.rs @@ -135,6 +135,7 @@ mod tests { id: 0, pts: 77, data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], + ..AudioPacket::default() }); voice.finish(); } diff --git a/testing/tests/server_upstream_media_contract.rs b/testing/tests/server_upstream_media_contract.rs index 411586b..d9be771 100644 --- a/testing/tests/server_upstream_media_contract.rs +++ b/testing/tests/server_upstream_media_contract.rs @@ -107,6 +107,7 @@ mod server_upstream_media { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], + ..AudioPacket::default() }) .await .expect("send synthetic upstream audio"); @@ -287,6 +288,7 @@ mod server_upstream_media { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], + ..AudioPacket::default() }) .await .expect("send matching audio packet"); diff --git a/testing/tests/server_upstream_media_pairing_contract.rs b/testing/tests/server_upstream_media_pairing_contract.rs index 1f99f18..0800e0b 100644 --- a/testing/tests/server_upstream_media_pairing_contract.rs +++ b/testing/tests/server_upstream_media_pairing_contract.rs @@ -193,6 +193,7 @@ mod server_upstream_media_pairing { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], + ..AudioPacket::default() }) .await .expect("send anchor audio packet"); @@ -254,6 +255,7 @@ mod server_upstream_media_pairing { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], + ..AudioPacket::default() }) .await .expect("send stale synthetic upstream audio"); @@ -320,6 +322,7 @@ mod server_upstream_media_pairing { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], + ..AudioPacket::default() }) .await .expect("send leading audio packet"); @@ -338,6 +341,7 @@ mod server_upstream_media_pairing { id: 0, pts: 1_310_000, data: vec![5, 6, 7, 8], + ..AudioPacket::default() }) .await .expect("send post-anchor audio packet"); @@ -419,6 +423,7 @@ mod server_upstream_media_pairing { id: 0, pts: 1_300_000, data: vec![1, 2, 3, 4], + ..AudioPacket::default() }) .await .expect("send anchor audio packet"); diff --git a/testing/tests/server_upstream_media_pairing_freshness_contract.rs b/testing/tests/server_upstream_media_pairing_freshness_contract.rs index 365e594..d927538 100644 --- a/testing/tests/server_upstream_media_pairing_freshness_contract.rs +++ b/testing/tests/server_upstream_media_pairing_freshness_contract.rs @@ -106,6 +106,7 @@ mod server_upstream_media_pairing { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], + ..AudioPacket::default() }) .await .expect("send stale synthetic upstream audio"); @@ -170,6 +171,7 @@ mod server_upstream_media_pairing { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], + ..AudioPacket::default() }) .await .expect("send first audio packet");