media: bundle webcam audio and video uplink

This commit is contained in:
Brad Stein 2026-05-03 02:13:39 -03:00
parent ffd6a08749
commit c7ccfe9952
29 changed files with 1186 additions and 23 deletions

View File

@ -1,5 +1,74 @@
# Lesavka Agent Notes # Lesavka Agent Notes
## 0.17.38 Bundled Webcam A/V Migration Checklist
Context: manual Google Meet and mirrored-probe testing showed the split webcam
and microphone uplink design is too fragile under real browser/device pressure.
The new product contract is: when webcam video is present, microphone audio
travels with it on one client-owned upstream media stream. The server manages
freshness and smoothness after arrival; it no longer tries to make two racing
upstream channels look synchronized. Microphone-only remains supported as the
explicit no-camera path.
### Product Invariants
- [x] Webcam-enabled sessions use one bundled upstream media RPC by default.
- [x] Webcam-enabled sessions imply microphone capture when the server supports UAC.
- [x] The UI-selected camera, camera quality, microphone, speaker, gain, and
enable switches remain authoritative; defaults may not override visible UI state.
- [x] Client capture timestamps are the source of A/V sync truth for webcam sessions.
- [x] Server bundled playout rebases that client timeline onto a fresh local epoch.
- [x] Server bundled playout may drop stale packets, but must not rebuild sync by
independently pairing separate camera and microphone streams.
- [x] Mic-only sessions keep the existing microphone stream path.
- [x] Legacy split webcam/mic uplink is only an explicit compatibility escape hatch.
- [ ] Manual probes and diagnostics clearly label `bundled-webcam-media` versus
`mic-only` so we never confuse the architectures during debugging.
### Wire Protocol
- [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or
more audio packets from the same client capture timeline.
- [x] Add `StreamWebcamMedia(stream UpstreamMediaBundle)` to the relay service.
- [x] Advertise bundled support in the handshake capability set.
- [ ] Add compatibility tests proving older split RPCs are not used by normal
webcam sessions when bundled support is advertised.
### Client Migration
- [x] Change session startup to prefer bundled webcam media whenever camera and
microphone are both available.
- [x] Spawn one bundled capture/uplink task instead of separate camera and mic
tasks for webcam sessions.
- [x] Bundle camera frames and microphone packets into one freshness-bounded queue.
- [x] Stamp all packets at capture/uplink enqueue before the async gRPC stream
can add misleading delay.
- [x] Preserve live UI device/profile changes by restarting the bundled capture
pipeline when selected camera, camera quality, or microphone changes.
- [ ] Make launcher diagnostics expose the active upstream mode as first-class
text rather than inferring from separate camera/mic telemetry.
- [ ] Migrate sync-probe runner to the bundled path explicitly and remove any
normal probe dependence on split `StreamCamera` + `StreamMicrophone`.
### Server Migration
- [x] Implement `StreamWebcamMedia` and make it own both UAC and UVC/HDMI sinks
for one upstream session.
- [x] Schedule bundled packets by shared client capture timestamp instead of
startup-pairing independent streams.
- [x] Keep server freshness drops/reanchors active for bundled media.
- [x] Continue reporting client timing and sink handoff diagnostics from bundled packets.
- [ ] Add bundled-mode counters for first bundle, first audio push, first video feed,
dropped stale bundles, and bundle queue age.
- [ ] Retire split-stream planner assumptions from the webcam path after the
bundled mode passes manual Google Meet and mirrored-probe validation.
### Validation
- [x] `cargo check -p lesavka_common -p lesavka_client -p lesavka_server --bins`
- [x] Focused handshake and launcher tests.
- [ ] Focused server upstream-media tests including bundled stream acceptance.
- [ ] Install on both ends and verify diagnostics show bundled webcam media.
- [ ] Manual Google Meet test: camera starts, video is not black/unsupported,
audio is intelligible, and lip sync is inside the acceptable band.
- [ ] Mirrored browser probe: recordings open in Dolphin and automated analyzer
no longer depends on fragile split-channel assumptions.
## A/V Sync Probe And Lip-Sync Validation Checklist ## A/V Sync Probe And Lip-Sync Validation Checklist
Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind video even though internal client/server telemetry reported fresh uplink packets. Treat this as a product correctness failure, not a calibration issue. Do not resume blind lip-sync tuning until the probe can explain where delay appears. Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind video even though internal client/server telemetry reported fresh uplink packets. Treat this as a product correctness failure, not a calibration issue. Do not resume blind lip-sync tuning until the probe can explain where delay appears.

7
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.37" version = "0.17.38"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.37" version = "0.17.38"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.37" version = "0.17.38"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1731,6 +1731,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
"base64",
"chacha20poly1305", "chacha20poly1305",
"chrono", "chrono",
"evdev", "evdev",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.37" version = "0.17.38"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -19,8 +19,8 @@ use winit::{
}; };
use lesavka_common::lesavka::{ use lesavka_common::lesavka::{
AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, UpstreamMediaBundle,
relay_client::RelayClient, VideoPacket, relay_client::RelayClient,
}; };
#[cfg(not(coverage))] #[cfg(not(coverage))]

View File

@ -62,10 +62,14 @@ impl LesavkaClientApp {
let initial_cam_profile = initial_camera_profile_id_from_env(); let initial_cam_profile = initial_camera_profile_id_from_env();
let initial_mic_source = std::env::var("LESAVKA_MIC_SOURCE").ok(); let initial_mic_source = std::env::var("LESAVKA_MIC_SOURCE").ok();
let initial_audio_sink = std::env::var("LESAVKA_AUDIO_SINK").ok(); let initial_audio_sink = std::env::var("LESAVKA_AUDIO_SINK").ok();
let camera_enabled = caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err();
let microphone_available = caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err();
let bundled_webcam_media =
camera_enabled && microphone_available && caps.bundled_webcam_media;
let media_controls = crate::live_media_control::LiveMediaControls::from_env( let media_controls = crate::live_media_control::LiveMediaControls::from_env(
crate::live_media_control::MediaControlState::with_devices( crate::live_media_control::MediaControlState::with_devices(
caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(), camera_enabled,
caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(), microphone_available || bundled_webcam_media,
std::env::var("LESAVKA_AUDIO_DISABLE").is_err(), std::env::var("LESAVKA_AUDIO_DISABLE").is_err(),
initial_cam_source.clone(), initial_cam_source.clone(),
initial_cam_profile.clone(), initial_cam_profile.clone(),
@ -229,31 +233,62 @@ impl LesavkaClientApp {
} else { } else {
info!("🧪 headless mode: skipping video/audio renderers"); info!("🧪 headless mode: skipping video/audio renderers");
} }
/*────────── camera & mic tasks (gated by caps) ───────────*/ /*────────── upstream webcam/mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { if bundled_webcam_media {
if let Some(cfg) = camera_cfg { if let Some(cfg) = camera_cfg {
info!( info!(
codec = ?cfg.codec, codec = ?cfg.codec,
width = cfg.width, width = cfg.width,
height = cfg.height, height = cfg.height,
fps = cfg.fps, fps = cfg.fps,
"📸 using negotiated server UVC caps for emitted format; launcher quality still controls local capture" "📦 using bundled webcam A/V uplink; client capture owns sync, server owns freshness"
); );
} }
let ep = vid_ep.clone(); let ep = vid_ep.clone();
let cam_telemetry = let cam_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera); uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera);
let mic_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone);
let media_controls = media_controls.clone(); let media_controls = media_controls.clone();
tokio::spawn(Self::cam_loop( tokio::spawn(Self::webcam_media_loop(
ep, ep,
initial_cam_source.clone(), initial_cam_source.clone(),
initial_cam_profile.clone(), initial_cam_profile.clone(),
initial_mic_source.clone(),
camera_cfg, camera_cfg,
cam_telemetry, cam_telemetry,
mic_telemetry,
media_controls, media_controls,
)); ));
} else if camera_enabled {
warn!(
"📦 server did not advertise bundled webcam A/V; split camera uplink is disabled unless LESAVKA_LEGACY_SPLIT_UPLINK=1"
);
if std::env::var("LESAVKA_LEGACY_SPLIT_UPLINK").is_ok() {
if let Some(cfg) = camera_cfg {
info!(
codec = ?cfg.codec,
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
"📸 using legacy split camera uplink by explicit override"
);
}
let ep = vid_ep.clone();
let cam_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera);
let media_controls = media_controls.clone();
tokio::spawn(Self::cam_loop(
ep,
initial_cam_source.clone(),
initial_cam_profile.clone(),
camera_cfg,
cam_telemetry,
media_controls,
));
}
} }
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { if microphone_available && !bundled_webcam_media {
let ep = vid_ep.clone(); let ep = vid_ep.clone();
let mic_telemetry = let mic_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone);

View File

@ -1,4 +1,288 @@
impl LesavkaClientApp { impl LesavkaClientApp {
/*──────────────── bundled webcam + mic stream ─────────────────*/
#[cfg(not(coverage))]
async fn webcam_media_loop(
ep: Channel,
initial_camera_source: Option<String>,
initial_camera_profile: Option<String>,
initial_microphone_source: Option<String>,
camera_cfg: Option<crate::input::camera::CameraConfig>,
camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls,
) {
let mut delay = Duration::from_secs(1);
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let state = media_controls.refresh();
let camera_requested = state.camera;
let microphone_requested = state.microphone || state.camera;
if !camera_requested && !microphone_requested {
camera_telemetry.record_enabled(false);
microphone_telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref());
let active_camera_profile =
state.camera_profile.resolve(initial_camera_profile.as_deref());
let active_microphone_source = state
.microphone_source
.resolve(initial_microphone_source.as_deref());
let capture_profile = active_camera_profile
.as_deref()
.and_then(parse_camera_profile_id);
let use_default_microphone = matches!(
state.microphone_source,
crate::live_media_control::MediaDeviceChoice::Auto
) && active_microphone_source.is_none();
let setup_camera_source = active_camera_source.clone();
let setup_microphone_source = active_microphone_source.clone();
let setup = tokio::task::spawn_blocking(move || {
let microphone = if use_default_microphone {
MicrophoneCapture::new_default_source()
} else {
MicrophoneCapture::new_with_source(setup_microphone_source.as_deref())
}?;
let camera = if camera_requested {
Some(CameraCapture::new_with_capture_profile(
setup_camera_source.as_deref(),
camera_cfg,
capture_profile,
)?)
} else {
None
};
Ok::<_, anyhow::Error>((camera.map(Arc::new), Arc::new(microphone)))
})
.await;
let (camera, microphone) = match setup {
Ok(Ok(captures)) => captures,
Ok(Err(err)) => {
camera_telemetry.record_disconnect(format!(
"bundled webcam media setup failed: {err:#}"
));
microphone_telemetry.record_disconnect(format!(
"bundled webcam media setup failed: {err:#}"
));
warn!(
"📦 bundled webcam media setup failed for camera={:?} mic={:?}: {err:#}",
active_camera_source.as_deref().unwrap_or("auto"),
active_microphone_source.as_deref().unwrap_or("auto")
);
if camera_requested {
abort_if_required_media_source_failed(
"camera",
"📸",
active_camera_source.as_deref(),
&err,
);
}
abort_if_required_media_source_failed(
"microphone",
"🎤",
active_microphone_source.as_deref(),
&err,
);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
Err(err) => {
camera_telemetry.record_disconnect(format!(
"bundled webcam media setup task failed: {err}"
));
microphone_telemetry.record_disconnect(format!(
"bundled webcam media setup task failed: {err}"
));
warn!("📦 bundled webcam media setup task failed: {err}");
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
camera_telemetry.record_reconnect_attempt();
microphone_telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue: crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle> =
crate::uplink_fresh_queue::FreshPacketQueue::new(BUNDLED_MEDIA_UPLINK_QUEUE);
let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new(
"bundled-webcam-media",
"📦",
)));
let queue_stream = queue.clone();
let camera_telemetry_stream = camera_telemetry.clone();
let microphone_telemetry_stream = microphone_telemetry.clone();
let drop_log_stream = Arc::clone(&drop_log);
let outbound = async_stream::stream! {
loop {
let next = queue_stream.pop_fresh().await;
if next.dropped_stale > 0 {
camera_telemetry_stream.record_stale_drop(next.dropped_stale);
microphone_telemetry_stream.record_stale_drop(next.dropped_stale);
log_uplink_drop(
&drop_log_stream,
UplinkDropReason::Stale,
next.dropped_stale,
next.queue_depth,
duration_ms(next.delivery_age),
);
}
if let Some(mut bundle) = next.packet {
let queue_depth = queue_depth_u32(next.queue_depth);
let delivery_age_ms = duration_ms(next.delivery_age);
if bundle.video.is_some() {
camera_telemetry_stream.record_streamed(
queue_depth,
delivery_age_ms,
);
}
if !bundle.audio.is_empty() {
microphone_telemetry_stream.record_streamed(
queue_depth,
delivery_age_ms,
);
}
attach_bundle_queue_metadata(&mut bundle, next.queue_depth, next.delivery_age);
yield bundle;
continue;
}
break;
}
};
match cli.stream_webcam_media(Request::new(outbound)).await {
Ok(mut resp) => {
let stop = Arc::new(AtomicBool::new(false));
let (event_tx, event_rx) = std::sync::mpsc::channel::<BundledCaptureEvent>();
let camera_worker = camera.as_ref().map(|camera| {
let camera = Arc::clone(camera);
let stop = Arc::clone(&stop);
let event_tx = event_tx.clone();
let media_controls = media_controls.clone();
let initial_camera_source = initial_camera_source.clone();
let initial_camera_profile = initial_camera_profile.clone();
let active_camera_source = active_camera_source.clone();
let active_camera_profile = active_camera_profile.clone();
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_camera_source.as_deref());
let desired_profile =
state.camera_profile.resolve(initial_camera_profile.as_deref());
if !state.camera
|| desired_source != active_camera_source
|| desired_profile != active_camera_profile
{
let _ = event_tx.send(BundledCaptureEvent::Restart);
break;
}
if let Some(mut pkt) = camera.pull() {
let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt);
if event_tx.send(BundledCaptureEvent::Video(pkt)).is_err() {
break;
}
}
}
})
});
let microphone_worker = {
let microphone = Arc::clone(&microphone);
let stop = Arc::clone(&stop);
let event_tx = event_tx.clone();
let media_controls = media_controls.clone();
let initial_microphone_source = initial_microphone_source.clone();
let active_microphone_source = active_microphone_source.clone();
let active_camera_requested = camera_requested;
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let state = media_controls.refresh();
let desired_source = state
.microphone_source
.resolve(initial_microphone_source.as_deref());
if state.camera != active_camera_requested
|| !(state.microphone || state.camera)
|| desired_source != active_microphone_source
{
let _ = event_tx.send(BundledCaptureEvent::Restart);
break;
}
if let Some(mut pkt) = microphone.pull() {
let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt);
if event_tx.send(BundledCaptureEvent::Audio(pkt)).is_err() {
break;
}
}
}
})
};
drop(event_tx);
let bundle_worker = {
let stop = Arc::clone(&stop);
let queue = queue.clone();
let camera_telemetry = camera_telemetry.clone();
let microphone_telemetry = microphone_telemetry.clone();
let drop_log = Arc::clone(&drop_log);
std::thread::spawn(move || {
bundle_captured_media(
event_rx,
stop,
queue,
camera_telemetry,
microphone_telemetry,
drop_log,
);
})
};
delay = Duration::from_secs(1);
camera_telemetry.record_connected();
microphone_telemetry.record_connected();
while resp.get_mut().message().await.transpose().is_some() {}
camera_telemetry.record_disconnect("bundled webcam media stream ended");
microphone_telemetry.record_disconnect("bundled webcam media stream ended");
stop.store(true, Ordering::Relaxed);
queue.close();
if let Some(worker) = camera_worker {
let _ = worker.join();
}
let _ = microphone_worker.join();
let _ = bundle_worker.join();
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
camera_telemetry.record_disconnect("bundled webcam media unavailable on server");
microphone_telemetry
.record_disconnect("bundled webcam media unavailable on server");
warn!("📦 server does not support bundled webcam media retrying");
delay = app_support::next_delay(delay);
}
Err(e) => {
camera_telemetry
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
microphone_telemetry
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌📦 bundled webcam media connect failed: {e}");
} else {
debug!("❌📦 bundled webcam media reconnect failed: {e}");
}
delay = app_support::next_delay(delay);
}
}
queue.close();
tokio::time::sleep(delay).await;
}
}
/*──────────────── mic stream ─────────────────*/ /*──────────────── mic stream ─────────────────*/
#[cfg(not(coverage))] #[cfg(not(coverage))]
async fn voice_loop( async fn voice_loop(
@ -485,6 +769,200 @@ const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest, policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest,
}; };
#[cfg(not(coverage))]
const BUNDLED_MEDIA_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 16,
max_age: Duration::from_millis(350),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly,
};
#[cfg(not(coverage))]
const BUNDLED_AUDIO_FLUSH_INTERVAL: Duration = Duration::from_millis(20);
#[cfg(not(coverage))]
const BUNDLED_AUDIO_MAX_PENDING: usize = 8;
#[cfg(not(coverage))]
#[derive(Debug)]
enum BundledCaptureEvent {
Audio(AudioPacket),
Video(VideoPacket),
Restart,
}
#[cfg(not(coverage))]
fn bundle_captured_media(
event_rx: std::sync::mpsc::Receiver<BundledCaptureEvent>,
stop: Arc<AtomicBool>,
queue: crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle>,
camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
drop_log: Arc<std::sync::Mutex<UplinkDropLogLimiter>>,
) {
static BUNDLED_SESSION: AtomicU64 = AtomicU64::new(0);
let session_id = BUNDLED_SESSION
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
let mut bundle_seq = 0_u64;
let mut pending_audio = Vec::new();
let mut next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL;
loop {
if stop.load(Ordering::Relaxed) {
break;
}
let timeout = next_audio_flush.saturating_duration_since(Instant::now());
match event_rx.recv_timeout(timeout) {
Ok(BundledCaptureEvent::Audio(packet)) => {
pending_audio.push(packet);
if pending_audio.len() >= BUNDLED_AUDIO_MAX_PENDING {
emit_bundled_media(
session_id,
&mut bundle_seq,
None,
std::mem::take(&mut pending_audio),
&queue,
&camera_telemetry,
&microphone_telemetry,
&drop_log,
);
next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL;
}
}
Ok(BundledCaptureEvent::Video(packet)) => {
emit_bundled_media(
session_id,
&mut bundle_seq,
Some(packet),
std::mem::take(&mut pending_audio),
&queue,
&camera_telemetry,
&microphone_telemetry,
&drop_log,
);
next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL;
}
Ok(BundledCaptureEvent::Restart) => {
stop.store(true, Ordering::Relaxed);
break;
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
if !pending_audio.is_empty() {
emit_bundled_media(
session_id,
&mut bundle_seq,
None,
std::mem::take(&mut pending_audio),
&queue,
&camera_telemetry,
&microphone_telemetry,
&drop_log,
);
}
next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL;
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
}
}
queue.close();
}
#[cfg(not(coverage))]
fn emit_bundled_media(
session_id: u64,
bundle_seq: &mut u64,
video: Option<VideoPacket>,
audio: Vec<AudioPacket>,
queue: &crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle>,
camera_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle,
microphone_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle,
drop_log: &Arc<std::sync::Mutex<UplinkDropLogLimiter>>,
) {
if video.is_none() && audio.is_empty() {
return;
}
*bundle_seq = bundle_seq.saturating_add(1);
let (capture_start_us, capture_end_us) = bundled_capture_bounds(video.as_ref(), &audio);
let enqueue_now_us = crate::live_capture_clock::capture_pts_us();
let enqueue_age = Duration::from_micros(enqueue_now_us.saturating_sub(capture_start_us));
let has_video = video.is_some();
let has_audio = !audio.is_empty();
let mut bundle = UpstreamMediaBundle {
session_id,
seq: *bundle_seq,
capture_start_us,
capture_end_us,
video,
audio,
..UpstreamMediaBundle::default()
};
attach_bundle_queue_metadata(&mut bundle, 0, enqueue_age);
let stats = queue.push(bundle, enqueue_age);
if stats.dropped_queue_full > 0 {
if has_video {
camera_telemetry.record_queue_full_drop(stats.dropped_queue_full);
}
if has_audio {
microphone_telemetry.record_queue_full_drop(stats.dropped_queue_full);
}
log_uplink_drop(
drop_log,
UplinkDropReason::QueueFull,
stats.dropped_queue_full,
stats.queue_depth,
duration_ms(enqueue_age),
);
}
let queue_depth = queue_depth_u32(stats.queue_depth);
let age_ms = duration_ms(enqueue_age);
if has_video {
camera_telemetry.record_enqueue(queue_depth, age_ms, 0.0);
}
if has_audio {
microphone_telemetry.record_enqueue(queue_depth, age_ms, 0.0);
}
}
#[cfg(not(coverage))]
fn bundled_capture_bounds(video: Option<&VideoPacket>, audio: &[AudioPacket]) -> (u64, u64) {
let mut start = u64::MAX;
let mut end = 0_u64;
if let Some(video) = video {
let pts = packet_video_capture_pts_us(video);
start = start.min(pts);
end = end.max(pts);
}
for packet in audio {
let pts = packet_audio_capture_pts_us(packet);
start = start.min(pts);
end = end.max(pts);
}
if start == u64::MAX {
let now = crate::live_capture_clock::capture_pts_us();
return (now, now);
}
(start, end.max(start))
}
#[cfg(not(coverage))]
fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 {
if packet.client_capture_pts_us == 0 {
packet.pts
} else {
packet.client_capture_pts_us
}
}
#[cfg(not(coverage))]
fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 {
if packet.client_capture_pts_us == 0 {
packet.pts
} else {
packet.client_capture_pts_us
}
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn queue_depth_u32(depth: usize) -> u32 { fn queue_depth_u32(depth: usize) -> u32 {
depth.try_into().unwrap_or(u32::MAX) depth.try_into().unwrap_or(u32::MAX)
@ -553,6 +1031,20 @@ fn attach_video_queue_metadata(
packet.client_queue_age_ms = duration_ms_u32(delivery_age); packet.client_queue_age_ms = duration_ms_u32(delivery_age);
} }
#[cfg(not(coverage))]
fn attach_bundle_queue_metadata(
bundle: &mut UpstreamMediaBundle,
queue_depth: usize,
delivery_age: Duration,
) {
for packet in &mut bundle.audio {
attach_audio_queue_metadata(packet, queue_depth, delivery_age);
}
if let Some(packet) = bundle.video.as_mut() {
attach_video_queue_metadata(packet, queue_depth, delivery_age);
}
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
enum UplinkDropReason { enum UplinkDropReason {

View File

@ -97,6 +97,7 @@ mod tests {
let mut caps = PeerCaps { let mut caps = PeerCaps {
camera: true, camera: true,
microphone: false, microphone: false,
bundled_webcam_media: false,
server_version: None, server_version: None,
camera_output: Some(String::from("uvc")), camera_output: Some(String::from("uvc")),
camera_codec: Some(String::from("mjpeg")), camera_codec: Some(String::from("mjpeg")),

View File

@ -247,6 +247,7 @@ fn print_versions(server_addr: &str, caps: &HandshakeSet) {
println!("server_revision={server_revision}"); println!("server_revision={server_revision}");
println!("server_camera_output={}", caps.camera_output); println!("server_camera_output={}", caps.camera_output);
println!("server_camera_codec={}", caps.camera_codec); println!("server_camera_codec={}", caps.camera_codec);
println!("server_bundled_webcam_media={}", caps.bundled_webcam_media);
} }
fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) { fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) {

View File

@ -12,6 +12,7 @@ use tracing::{info, warn};
pub struct PeerCaps { pub struct PeerCaps {
pub camera: bool, pub camera: bool,
pub microphone: bool, pub microphone: bool,
pub bundled_webcam_media: bool,
pub server_version: Option<String>, pub server_version: Option<String>,
pub camera_output: Option<String>, pub camera_output: Option<String>,
pub camera_codec: Option<String>, pub camera_codec: Option<String>,
@ -73,6 +74,7 @@ pub async fn negotiate(uri: &str) -> PeerCaps {
PeerCaps { PeerCaps {
camera: rsp.camera, camera: rsp.camera,
microphone: rsp.microphone, microphone: rsp.microphone,
bundled_webcam_media: rsp.bundled_webcam_media,
server_version: (!rsp.server_version.is_empty()) server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()), .then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty()).then_some(rsp.camera_output.clone()), camera_output: (!rsp.camera_output.is_empty()).then_some(rsp.camera_output.clone()),
@ -119,6 +121,7 @@ pub async fn probe(uri: &str) -> HandshakeProbe {
caps: PeerCaps { caps: PeerCaps {
camera: rsp.camera, camera: rsp.camera,
microphone: rsp.microphone, microphone: rsp.microphone,
bundled_webcam_media: rsp.bundled_webcam_media,
server_version: (!rsp.server_version.is_empty()) server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()), .then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty()) camera_output: (!rsp.camera_output.is_empty())
@ -196,6 +199,7 @@ pub async fn negotiate(uri: &str) -> PeerCaps {
let caps = PeerCaps { let caps = PeerCaps {
camera: rsp.camera, camera: rsp.camera,
microphone: rsp.microphone, microphone: rsp.microphone,
bundled_webcam_media: rsp.bundled_webcam_media,
server_version: if rsp.server_version.is_empty() { server_version: if rsp.server_version.is_empty() {
None None
} else { } else {
@ -301,6 +305,7 @@ pub async fn probe(uri: &str) -> HandshakeProbe {
let caps = PeerCaps { let caps = PeerCaps {
camera: rsp.camera, camera: rsp.camera,
microphone: rsp.microphone, microphone: rsp.microphone,
bundled_webcam_media: rsp.bundled_webcam_media,
server_version: (!rsp.server_version.is_empty()) server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()), .then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty()) camera_output: (!rsp.camera_output.is_empty())

View File

@ -45,6 +45,8 @@ impl Relay for ProbeRelay {
Pin<Box<dyn futures::Stream<Item = Result<lesavka_common::lesavka::Empty, Status>> + Send>>; Pin<Box<dyn futures::Stream<Item = Result<lesavka_common::lesavka::Empty, Status>> + Send>>;
type StreamCameraStream = type StreamCameraStream =
Pin<Box<dyn futures::Stream<Item = Result<lesavka_common::lesavka::Empty, Status>> + Send>>; Pin<Box<dyn futures::Stream<Item = Result<lesavka_common::lesavka::Empty, Status>> + Send>>;
type StreamWebcamMediaStream =
Pin<Box<dyn futures::Stream<Item = Result<lesavka_common::lesavka::Empty, Status>> + Send>>;
async fn stream_keyboard( async fn stream_keyboard(
&self, &self,
@ -90,6 +92,13 @@ impl Relay for ProbeRelay {
Ok(Response::new(Box::pin(stream::empty()))) Ok(Response::new(Box::pin(stream::empty())))
} }
async fn stream_webcam_media(
&self,
_request: Request<tonic::Streaming<lesavka_common::lesavka::UpstreamMediaBundle>>,
) -> Result<Response<Self::StreamWebcamMediaStream>, Status> {
Ok(Response::new(Box::pin(stream::empty())))
}
async fn paste_text( async fn paste_text(
&self, &self,
_request: Request<lesavka_common::lesavka::PasteRequest>, _request: Request<lesavka_common::lesavka::PasteRequest>,

View File

@ -6,7 +6,7 @@ use futures::stream;
use lesavka_common::lesavka::{ use lesavka_common::lesavka::{
AudioPacket, CalibrationRequest, CalibrationState, CapturePowerState, Empty, KeyboardReport, AudioPacket, CalibrationRequest, CalibrationState, CapturePowerState, Empty, KeyboardReport,
MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest,
UpstreamSyncState, VideoPacket, UpstreamMediaBundle, UpstreamSyncState, VideoPacket,
relay_server::{Relay, RelayServer}, relay_server::{Relay, RelayServer},
}; };
use serial_test::serial; use serial_test::serial;
@ -48,6 +48,7 @@ impl Relay for UtilityRelay {
type CaptureAudioStream = AudioStream; type CaptureAudioStream = AudioStream;
type StreamMicrophoneStream = EmptyStream; type StreamMicrophoneStream = EmptyStream;
type StreamCameraStream = EmptyStream; type StreamCameraStream = EmptyStream;
type StreamWebcamMediaStream = EmptyStream;
async fn stream_keyboard( async fn stream_keyboard(
&self, &self,
@ -91,6 +92,13 @@ impl Relay for UtilityRelay {
Ok(Response::new(Box::pin(stream::empty()))) Ok(Response::new(Box::pin(stream::empty())))
} }
async fn stream_webcam_media(
&self,
_request: Request<tonic::Streaming<UpstreamMediaBundle>>,
) -> Result<Response<Self::StreamWebcamMediaStream>, Status> {
Ok(Response::new(Box::pin(stream::empty())))
}
async fn paste_text( async fn paste_text(
&self, &self,
_request: Request<PasteRequest>, _request: Request<PasteRequest>,

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.37" version = "0.17.38"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -46,6 +46,23 @@ message AudioPacket {
uint32 client_queue_age_ms = 8; uint32 client_queue_age_ms = 8;
} }
message UpstreamMediaBundle {
// One client-owned webcam/microphone capture batch. When a webcam is active,
// this is the authoritative upstream transport so audio and video share one
// ordered gRPC stream instead of racing through independent channels.
uint64 session_id = 1;
uint64 seq = 2;
uint64 capture_start_us = 3;
uint64 capture_end_us = 4;
VideoPacket video = 5;
repeated AudioPacket audio = 6;
uint32 audio_sample_rate = 7;
uint32 audio_channels = 8;
uint32 video_width = 9;
uint32 video_height = 10;
uint32 video_fps = 11;
}
message ResetUsbReply { bool ok = 1; } // true = success message ResetUsbReply { bool ok = 1; } // true = success
message PasteRequest { message PasteRequest {
@ -157,6 +174,7 @@ message HandshakeSet {
uint32 eye_fps = 10; uint32 eye_fps = 10;
string server_version = 11; string server_version = 11;
string server_revision = 12; string server_revision = 12;
bool bundled_webcam_media = 13;
} }
message Empty {} message Empty {}
@ -168,6 +186,7 @@ service Relay {
rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket); rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket);
rpc StreamMicrophone (stream AudioPacket) returns (stream Empty); rpc StreamMicrophone (stream AudioPacket) returns (stream Empty);
rpc StreamCamera (stream VideoPacket) returns (stream Empty); rpc StreamCamera (stream VideoPacket) returns (stream Empty);
rpc StreamWebcamMedia(stream UpstreamMediaBundle) returns (stream Empty);
rpc PasteText (PasteRequest) returns (PasteReply); rpc PasteText (PasteRequest) returns (PasteReply);
rpc RecoverUsb (Empty) returns (ResetUsbReply); rpc RecoverUsb (Empty) returns (ResetUsbReply);

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.37" version = "0.17.38"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -35,6 +35,7 @@ impl Handshake for HandshakeSvc {
eye_fps, eye_fps,
server_version: crate::VERSION.to_string(), server_version: crate::VERSION.to_string(),
server_revision: crate::REVISION.to_string(), server_revision: crate::REVISION.to_string(),
bundled_webcam_media: camera_enabled && microphone,
})) }))
} }
} }

View File

@ -20,7 +20,7 @@ use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::{ use lesavka_common::lesavka::{
AudioPacket, CalibrationRequest, CalibrationState, CapturePowerCommand, CapturePowerState, AudioPacket, CalibrationRequest, CalibrationState, CapturePowerCommand, CapturePowerState,
Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply,
SetCapturePowerRequest, UpstreamSyncState, VideoPacket, SetCapturePowerRequest, UpstreamMediaBundle, UpstreamSyncState, VideoPacket,
relay_server::{Relay, RelayServer}, relay_server::{Relay, RelayServer},
}; };

View File

@ -1,3 +1,67 @@
#[cfg(not(coverage))]
#[derive(Debug)]
enum BundledUpstreamEvent {
Audio(AudioPacket),
Video(VideoPacket),
}
#[cfg(not(coverage))]
impl BundledUpstreamEvent {
fn remote_pts_us(&self) -> u64 {
match self {
Self::Audio(packet) => packet.pts,
Self::Video(packet) => packet.pts,
}
}
fn kind(&self) -> UpstreamMediaKind {
match self {
Self::Audio(_) => UpstreamMediaKind::Microphone,
Self::Video(_) => UpstreamMediaKind::Camera,
}
}
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug, Default)]
struct BundledPlayoutClock {
base_remote_pts_us: Option<u64>,
epoch: Option<tokio::time::Instant>,
}
#[cfg(not(coverage))]
impl BundledPlayoutClock {
fn ensure(
&mut self,
bundle: &UpstreamMediaBundle,
events: &[BundledUpstreamEvent],
) -> Option<(u64, tokio::time::Instant)> {
if self.base_remote_pts_us.is_none() || self.epoch.is_none() {
let base = if bundle.capture_start_us != 0 {
bundle.capture_start_us
} else {
events.iter().map(BundledUpstreamEvent::remote_pts_us).min()?
};
self.base_remote_pts_us = Some(base);
self.epoch = Some(tokio::time::Instant::now() + bundled_upstream_playout_delay());
}
Some((
self.base_remote_pts_us.unwrap_or_default(),
self.epoch.expect("bundled epoch initialized"),
))
}
}
#[cfg(not(coverage))]
fn bundled_upstream_playout_delay() -> Duration {
std::env::var("LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS")
.or_else(|_| std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS"))
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(350))
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
#[tonic::async_trait] #[tonic::async_trait]
impl Relay for Handler { impl Relay for Handler {
@ -7,6 +71,7 @@ impl Relay for Handler {
type CaptureAudioStream = AudioStream; type CaptureAudioStream = AudioStream;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>; type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>; type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
type StreamWebcamMediaStream = ReceiverStream<Result<Empty, Status>>;
async fn stream_keyboard( async fn stream_keyboard(
&self, &self,
@ -102,6 +167,210 @@ impl Relay for Handler {
Ok(Response::new(ReceiverStream::new(rx))) Ok(Response::new(ReceiverStream::new(rx)))
} }
/// Accept client-bundled webcam and microphone packets on one upstream clock.
async fn stream_webcam_media(
&self,
req: Request<tonic::Streaming<UpstreamMediaBundle>>,
) -> Result<Response<Self::StreamWebcamMediaStream>, Status> {
let rpc_id = runtime_support::next_stream_id();
let camera_cfg = camera::current_camera_config();
let microphone_lease = self.upstream_media_rt.activate_microphone();
let camera_lease = self.upstream_media_rt.activate_camera();
info!(
rpc_id,
session_id = camera_lease.session_id,
camera_generation = camera_lease.generation,
microphone_generation = microphone_lease.generation,
output = camera_cfg.output.as_str(),
codec = camera_cfg.codec.as_str(),
width = camera_cfg.width,
height = camera_cfg.height,
fps = camera_cfg.fps,
"📦 stream_webcam_media opened"
);
let Some(microphone_sink_permit) = self
.upstream_media_rt
.reserve_microphone_sink(microphone_lease.generation)
.await
else {
self.upstream_media_rt
.close_camera(camera_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
return Err(Status::aborted(
"bundled webcam media stream superseded before microphone sink became available",
));
};
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let mut sink = runtime_support::open_voice_with_retry(&uac_dev)
.await
.map_err(|e| {
self.upstream_media_rt
.close_camera(camera_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
Status::internal(format!("{e:#}"))
})?;
let (camera_session_id, relay, _relay_reused) =
match self.camera_rt.activate(&camera_cfg).await {
Ok(active) => active,
Err(err) => {
sink.finish();
self.upstream_media_rt
.close_camera(camera_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
return Err(err);
}
};
let camera_rt = self.camera_rt.clone();
let upstream_media_rt = self.upstream_media_rt.clone();
let frame_step_us = (1_000_000u64 / u64::from(camera_cfg.fps.max(1))).max(1);
let stale_drop_budget = upstream_stale_drop_budget();
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let _microphone_sink_permit = microphone_sink_permit;
let mut inbound = req.into_inner();
let mut clock = BundledPlayoutClock::default();
let mut outcome = "aborted";
'bundled_loop: loop {
let bundle = match inbound.next().await {
Some(Ok(bundle)) => bundle,
Some(Err(err)) => {
warn!(
rpc_id,
session_id = camera_lease.session_id,
"📦 stream_webcam_media inbound error before clean EOF: {err}"
);
break;
}
None => {
outcome = "closed";
break;
}
};
if !camera_rt.is_active(camera_session_id)
|| !upstream_media_rt.is_camera_active(camera_lease.generation)
|| !upstream_media_rt.is_microphone_active(microphone_lease.generation)
{
outcome = "superseded";
break;
}
let mut events = Vec::with_capacity(bundle.audio.len() + 1);
if let Some(video) = bundle.video.clone() {
upstream_media_rt
.record_client_timing(UpstreamMediaKind::Camera, video_client_timing(&video));
events.push(BundledUpstreamEvent::Video(video));
}
for audio in &bundle.audio {
upstream_media_rt.record_client_timing(
UpstreamMediaKind::Microphone,
audio_client_timing(audio),
);
events.push(BundledUpstreamEvent::Audio(audio.clone()));
}
if events.is_empty() {
continue;
}
events.sort_by_key(BundledUpstreamEvent::remote_pts_us);
let Some((base_remote_pts_us, epoch)) = clock.ensure(&bundle, &events) else {
continue;
};
for event in events {
let kind = event.kind();
let min_step_us = match kind {
UpstreamMediaKind::Camera => frame_step_us,
UpstreamMediaKind::Microphone => 1,
};
let plan = match upstream_media_rt.plan_bundled_pts(
kind,
event.remote_pts_us(),
min_step_us,
base_remote_pts_us,
epoch,
) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => continue,
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => {
tracing::warn!(
rpc_id,
session_id = camera_lease.session_id,
?kind,
reason,
"📦 bundled upstream packet dropped by freshness planner"
);
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => continue,
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => {
tracing::error!(
rpc_id,
session_id = camera_lease.session_id,
reason,
"📦 bundled upstream startup failed"
);
break 'bundled_loop;
}
};
if plan.late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id = camera_lease.session_id,
?kind,
late_by_ms = plan.late_by.as_millis(),
pts = plan.local_pts_us,
"📦 bundled upstream packet dropped after missing freshness budget"
);
continue;
}
tokio::time::sleep_until(plan.due_at).await;
let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at)
.unwrap_or_default();
if actual_late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id = camera_lease.session_id,
?kind,
late_by_ms = actual_late_by.as_millis(),
pts = plan.local_pts_us,
"📦 bundled upstream packet dropped after waking too late"
);
continue;
}
match event {
BundledUpstreamEvent::Audio(mut packet) => {
packet.pts = plan.local_pts_us;
sink.push(&packet);
upstream_media_rt.mark_audio_presented(packet.pts, plan.due_at);
}
BundledUpstreamEvent::Video(mut packet) => {
packet.pts = plan.local_pts_us;
let presented_pts = packet.pts;
relay.feed(packet);
upstream_media_rt.mark_video_presented(presented_pts, plan.due_at);
}
}
}
}
sink.finish();
upstream_media_rt.close_camera(camera_lease.generation);
upstream_media_rt.close_microphone(microphone_lease.generation);
info!(
rpc_id,
session_id = camera_lease.session_id,
camera_session_id,
outcome,
"📦 stream_webcam_media lifecycle ended"
);
tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
/// Accept synthetic upstream microphone packets without ALSA hardware. /// Accept synthetic upstream microphone packets without ALSA hardware.
async fn stream_microphone( async fn stream_microphone(
&self, &self,

View File

@ -45,6 +45,7 @@ impl Relay for Handler {
type CaptureAudioStream = AudioStream; type CaptureAudioStream = AudioStream;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>; type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>; type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
type StreamWebcamMediaStream = ReceiverStream<Result<Empty, Status>>;
async fn stream_keyboard( async fn stream_keyboard(
&self, &self,
@ -186,6 +187,40 @@ impl Relay for Handler {
Ok(Response::new(ReceiverStream::new(rx))) Ok(Response::new(ReceiverStream::new(rx)))
} }
async fn stream_webcam_media(
&self,
req: Request<tonic::Streaming<UpstreamMediaBundle>>,
) -> Result<Response<Self::StreamWebcamMediaStream>, Status> {
let microphone_lease = self.upstream_media_rt.activate_microphone();
let camera_lease = self.upstream_media_rt.activate_camera();
let (tx, rx) = tokio::sync::mpsc::channel(1);
let upstream_media_rt = self.upstream_media_rt.clone();
tokio::spawn(async move {
let mut inbound = req.into_inner();
while let Some(bundle) = inbound.next().await.transpose()? {
if let Some(video) = bundle.video {
upstream_media_rt.record_client_timing(
UpstreamMediaKind::Camera,
video_client_timing(&video),
);
}
for audio in bundle.audio {
upstream_media_rt.record_client_timing(
UpstreamMediaKind::Microphone,
audio_client_timing(&audio),
);
}
}
upstream_media_rt.close_camera(camera_lease.generation);
upstream_media_rt.close_microphone(microphone_lease.generation);
let _ = tx.send(Ok(Empty {})).await;
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_camera( async fn stream_camera(
&self, &self,
req: Request<tonic::Streaming<VideoPacket>>, req: Request<tonic::Streaming<VideoPacket>>,

View File

@ -292,6 +292,142 @@ impl UpstreamMediaRuntime {
self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1)
} }
/// Schedule a packet from the bundled webcam/microphone transport.
///
/// Inputs: the media kind, client capture timestamp, packet cadence floor,
/// and the client-owned bundle epoch chosen for this gRPC stream.
/// Outputs: the server playout decision for that packet.
/// Why: bundled webcam media has already been synchronized on the client,
/// so the server should not re-solve cross-stream startup pairing. It only
/// rebases the shared client clock onto a fresh local playout epoch.
#[must_use]
pub fn plan_bundled_pts(
&self,
kind: UpstreamMediaKind,
remote_pts_us: u64,
min_step_us: u64,
bundle_base_remote_pts_us: u64,
bundle_epoch: Instant,
) -> UpstreamPlanDecision {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let session_id = state.session_id;
match kind {
UpstreamMediaKind::Camera => {
state.camera_packet_count = state.camera_packet_count.saturating_add(1);
state
.first_camera_remote_pts_us
.get_or_insert(remote_pts_us);
state.camera_startup_ready = true;
}
UpstreamMediaKind::Microphone => {
state.microphone_packet_count = state.microphone_packet_count.saturating_add(1);
state
.first_microphone_remote_pts_us
.get_or_insert(remote_pts_us);
}
}
update_latest_remote_pts(&mut state, kind, remote_pts_us);
if state.session_base_remote_pts_us.is_none() {
state.session_base_remote_pts_us = Some(bundle_base_remote_pts_us);
state.playout_epoch = Some(bundle_epoch);
state.pairing_anchor_deadline = Some(bundle_epoch);
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "client-bundled upstream media epoch established".to_string();
self.pairing_state_notify.notify_waiters();
info!(
session_id,
bundle_base_remote_pts_us, "client-bundled upstream media epoch established"
);
}
let session_base_remote_pts_us = state
.session_base_remote_pts_us
.unwrap_or(bundle_base_remote_pts_us);
if remote_pts_us < session_base_remote_pts_us {
return UpstreamPlanDecision::DropBeforeOverlap;
}
let max_live_lag = upstream_max_live_lag();
let source_lag = source_lag_for_kind(&state, kind, remote_pts_us);
if source_lag > max_live_lag {
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason =
"dropped stale bundled video beyond max live lag".to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason =
"dropped stale bundled audio beyond max live lag".to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale("bundled packet exceeded max live lag");
}
let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us);
let last_slot = match kind {
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us,
};
if let Some(last_pts_us) = *last_slot
&& local_pts_us <= last_pts_us
{
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
}
*last_slot = Some(local_pts_us);
let sink_offset_us = self.playout_offset_us(kind);
let epoch = state.playout_epoch.unwrap_or(bundle_epoch);
let mut due_at =
apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us);
let now = Instant::now();
let mut late_by = now.checked_duration_since(due_at).unwrap_or_default();
let playout_delay = upstream_playout_delay().min(max_live_lag);
let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay);
if late_by > reanchor_threshold {
let desired_due_at = now + playout_delay;
let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us);
let recovered_epoch = unoffset_due_at
.checked_sub(Duration::from_micros(local_pts_us))
.unwrap_or(unoffset_due_at);
state.playout_epoch = Some(recovered_epoch);
state.pairing_anchor_deadline = Some(desired_due_at);
state.freshness_reanchors = state.freshness_reanchors.saturating_add(1);
state.phase = UpstreamSyncPhase::Healing;
state.last_reason =
"reanchored bundled upstream playhead to preserve freshness".to_string();
due_at = apply_playout_offset(
recovered_epoch + Duration::from_micros(local_pts_us),
sink_offset_us,
);
late_by = now.checked_duration_since(due_at).unwrap_or_default();
info!(
session_id,
?kind,
local_pts_us,
remote_pts_us,
recovery_buffer_ms = playout_delay.as_millis(),
"bundled upstream media playhead reanchored to preserve freshness"
);
}
if kind == UpstreamMediaKind::Microphone {
self.audio_progress_notify.notify_waiters();
}
UpstreamPlanDecision::Play(PlannedUpstreamPacket {
local_pts_us,
due_at,
late_by,
source_lag,
})
}
/// Hold video until the audio master has at least reached the same capture /// Hold video until the audio master has at least reached the same capture
/// moment, or until the bounded sync grace is exhausted. /// moment, or until the bounded sync grace is exhausted.
pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool {

View File

@ -36,6 +36,46 @@ fn shared_playout_epoch_is_reused_across_audio_and_video() {
); );
} }
#[test]
#[serial(upstream_media_runtime)]
fn bundled_media_uses_client_epoch_without_pairing_wait() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let epoch = tokio::time::Instant::now();
let audio = play(runtime.plan_bundled_pts(
super::UpstreamMediaKind::Microphone,
1_000_000,
1,
1_000_000,
epoch,
));
let video = play(runtime.plan_bundled_pts(
super::UpstreamMediaKind::Camera,
1_033_333,
16_666,
1_000_000,
epoch,
));
assert_eq!(audio.local_pts_us, 0);
assert_eq!(video.local_pts_us, 33_333);
assert_eq!(
video.due_at.saturating_duration_since(audio.due_at),
Duration::from_micros(33_333)
);
let snapshot = runtime.snapshot();
assert_eq!(snapshot.latest_microphone_remote_pts_us, Some(1_000_000));
assert_eq!(snapshot.latest_camera_remote_pts_us, Some(1_033_333));
assert_eq!(
snapshot.last_reason,
"client-bundled upstream media epoch established"
);
});
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn pairing_window_holds_one_sided_playout_by_default() { fn pairing_window_holds_one_sided_playout_by_default() {

View File

@ -12,6 +12,7 @@ path = "src/lib.rs"
[dev-dependencies] [dev-dependencies]
anyhow = "1.0" anyhow = "1.0"
async-stream = "0.3" async-stream = "0.3"
base64 = "0.22"
chrono = "0.4" chrono = "0.4"
evdev = "0.13" evdev = "0.13"
futures-util = "0.3" futures-util = "0.3"

View File

@ -12,6 +12,7 @@ mod handshake {
pub struct PeerCaps { pub struct PeerCaps {
pub camera: bool, pub camera: bool,
pub microphone: bool, pub microphone: bool,
pub bundled_webcam_media: bool,
pub server_version: Option<String>, pub server_version: Option<String>,
} }
@ -19,6 +20,7 @@ mod handshake {
PeerCaps { PeerCaps {
camera: std::env::var("LESAVKA_TEST_CAP_CAMERA").is_ok(), camera: std::env::var("LESAVKA_TEST_CAP_CAMERA").is_ok(),
microphone: std::env::var("LESAVKA_TEST_CAP_MIC").is_ok(), microphone: std::env::var("LESAVKA_TEST_CAP_MIC").is_ok(),
bundled_webcam_media: std::env::var("LESAVKA_TEST_CAP_BUNDLED").is_ok(),
server_version: None, server_version: None,
} }
} }
@ -93,7 +95,7 @@ mod relay_transport {
mod input { mod input {
pub mod camera { pub mod camera {
use crate::app_support::CameraConfig; pub use crate::app_support::CameraConfig;
use lesavka_common::lesavka::VideoPacket; use lesavka_common::lesavka::VideoPacket;
pub struct CameraCapture; pub struct CameraCapture;
@ -103,6 +105,14 @@ mod input {
Ok(Self) Ok(Self)
} }
pub fn new_with_capture_profile(
source: Option<&str>,
cfg: Option<CameraConfig>,
_profile: Option<(u32, u32, u32)>,
) -> anyhow::Result<Self> {
Self::new(source, cfg)
}
pub fn pull(&self) -> Option<VideoPacket> { pub fn pull(&self) -> Option<VideoPacket> {
None None
} }
@ -119,6 +129,14 @@ mod input {
Ok(Self) Ok(Self)
} }
pub fn new_default_source() -> anyhow::Result<Self> {
Self::new()
}
pub fn new_with_source(_source: Option<&str>) -> anyhow::Result<Self> {
Self::new()
}
pub fn pull(&self) -> Option<AudioPacket> { pub fn pull(&self) -> Option<AudioPacket> {
None None
} }
@ -196,6 +214,14 @@ mod output {
Ok(Self) Ok(Self)
} }
pub fn new_default_sink() -> anyhow::Result<Self> {
Self::new()
}
pub fn new_with_sink(_sink: Option<&str>) -> anyhow::Result<Self> {
Self::new()
}
pub fn push(&self, _pkt: AudioPacket) {} pub fn push(&self, _pkt: AudioPacket) {}
} }
} }

View File

@ -41,7 +41,7 @@ mod audio_include_contract {
#[serial] #[serial]
fn pick_sink_element_prefers_operator_override() { fn pick_sink_element_prefers_operator_override() {
with_var("LESAVKA_AUDIO_SINK", Some("fakesink sync=false"), || { with_var("LESAVKA_AUDIO_SINK", Some("fakesink sync=false"), || {
let sink = pick_sink_element().expect("override sink"); let sink = pick_sink_element(None, true).expect("override sink");
assert_eq!(sink, "fakesink sync=false"); assert_eq!(sink, "fakesink sync=false");
}); });
} }
@ -53,7 +53,7 @@ mod audio_include_contract {
"LESAVKA_AUDIO_SINK", "LESAVKA_AUDIO_SINK",
Some("alsa_output.pci-0000_00_1f.3.analog-stereo"), Some("alsa_output.pci-0000_00_1f.3.analog-stereo"),
|| { || {
let sink = pick_sink_element().expect("device sink"); let sink = pick_sink_element(None, true).expect("device sink");
assert_eq!( assert_eq!(
sink, sink,
"pulsesink device=\"alsa_output.pci-0000_00_1f.3.analog-stereo\" buffer-time=350000 latency-time=100000 sync=true" "pulsesink device=\"alsa_output.pci-0000_00_1f.3.analog-stereo\" buffer-time=350000 latency-time=100000 sync=true"
@ -83,7 +83,7 @@ exit 0
"DEFAULT".to_string() "DEFAULT".to_string()
)] )]
); );
let sink = pick_sink_element().expect("pick sink"); let sink = pick_sink_element(None, true).expect("pick sink");
assert_eq!( assert_eq!(
sink, sink,
"pulsesink device=\"alsa_output.usb-DAC_1234-00.analog-stereo\" buffer-time=350000 latency-time=100000 sync=true" "pulsesink device=\"alsa_output.usb-DAC_1234-00.analog-stereo\" buffer-time=350000 latency-time=100000 sync=true"
@ -99,7 +99,7 @@ exit 0
"LESAVKA_AUDIO_SINK", "LESAVKA_AUDIO_SINK",
Some("bluez_output.80_C3_BA_76_26_AB.1"), Some("bluez_output.80_C3_BA_76_26_AB.1"),
|| { || {
let sink = pick_sink_element().expect("bluetooth sink"); let sink = pick_sink_element(None, true).expect("bluetooth sink");
assert_eq!( assert_eq!(
sink, sink,
"pulsesink device=\"bluez_output.80_C3_BA_76_26_AB.1\" buffer-time=750000 latency-time=250000 sync=true" "pulsesink device=\"bluez_output.80_C3_BA_76_26_AB.1\" buffer-time=750000 latency-time=250000 sync=true"
@ -125,7 +125,7 @@ exit 0
list_pw_sinks().is_empty(), list_pw_sinks().is_empty(),
"no default sink should be parsed" "no default sink should be parsed"
); );
let sink = pick_sink_element().expect("fallback sink"); let sink = pick_sink_element(None, true).expect("fallback sink");
assert_eq!(sink, "autoaudiosink"); assert_eq!(sink, "autoaudiosink");
}); });
}); });
@ -142,6 +142,7 @@ exit 0
id: 0, id: 0,
pts: 1_234, pts: 1_234,
data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC],
..AudioPacket::default()
}); });
drop(out); drop(out);
} }
@ -180,6 +181,7 @@ exit 0
id: 0, id: 0,
pts: 42_666, pts: 42_666,
data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC],
..AudioPacket::default()
}, },
&timeline, &timeline,
); );
@ -356,6 +358,7 @@ exit 1
id: 0, id: 0,
pts: 1_500, pts: 1_500,
data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC],
..AudioPacket::default()
}, },
&timeline, &timeline,
); );

View File

@ -101,6 +101,7 @@ fn audio_out_constructor_and_push_are_stable() {
id: 0, id: 0,
pts: 0, pts: 0,
data: Vec::new(), data: Vec::new(),
..AudioPacket::default()
}); });
} }
Err(err) => { Err(err) => {

View File

@ -142,6 +142,7 @@ impl Handshake for SparseHandshakeSvc {
eye_height: 0, eye_height: 0,
eye_fps: 0, eye_fps: 0,
server_revision: String::new(), server_revision: String::new(),
bundled_webcam_media: false,
})) }))
} }
} }

View File

@ -135,6 +135,7 @@ mod tests {
id: 0, id: 0,
pts: 77, pts: 77,
data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC], data: vec![0xFF, 0xF1, 0x50, 0x80, 0x00, 0x1F, 0xFC],
..AudioPacket::default()
}); });
voice.finish(); voice.finish();
} }

View File

@ -107,6 +107,7 @@ mod server_upstream_media {
id: 0, id: 0,
pts: 12_345, pts: 12_345,
data: vec![1, 2, 3, 4, 5, 6], data: vec![1, 2, 3, 4, 5, 6],
..AudioPacket::default()
}) })
.await .await
.expect("send synthetic upstream audio"); .expect("send synthetic upstream audio");
@ -287,6 +288,7 @@ mod server_upstream_media {
id: 0, id: 0,
pts: 1_000_000, pts: 1_000_000,
data: vec![1, 2, 3, 4], data: vec![1, 2, 3, 4],
..AudioPacket::default()
}) })
.await .await
.expect("send matching audio packet"); .expect("send matching audio packet");

View File

@ -193,6 +193,7 @@ mod server_upstream_media_pairing {
id: 0, id: 0,
pts: 1_000_000, pts: 1_000_000,
data: vec![1, 2, 3, 4], data: vec![1, 2, 3, 4],
..AudioPacket::default()
}) })
.await .await
.expect("send anchor audio packet"); .expect("send anchor audio packet");
@ -254,6 +255,7 @@ mod server_upstream_media_pairing {
id: 0, id: 0,
pts: 12_345, pts: 12_345,
data: vec![1, 2, 3, 4, 5, 6], data: vec![1, 2, 3, 4, 5, 6],
..AudioPacket::default()
}) })
.await .await
.expect("send stale synthetic upstream audio"); .expect("send stale synthetic upstream audio");
@ -320,6 +322,7 @@ mod server_upstream_media_pairing {
id: 0, id: 0,
pts: 1_000_000, pts: 1_000_000,
data: vec![1, 2, 3, 4], data: vec![1, 2, 3, 4],
..AudioPacket::default()
}) })
.await .await
.expect("send leading audio packet"); .expect("send leading audio packet");
@ -338,6 +341,7 @@ mod server_upstream_media_pairing {
id: 0, id: 0,
pts: 1_310_000, pts: 1_310_000,
data: vec![5, 6, 7, 8], data: vec![5, 6, 7, 8],
..AudioPacket::default()
}) })
.await .await
.expect("send post-anchor audio packet"); .expect("send post-anchor audio packet");
@ -419,6 +423,7 @@ mod server_upstream_media_pairing {
id: 0, id: 0,
pts: 1_300_000, pts: 1_300_000,
data: vec![1, 2, 3, 4], data: vec![1, 2, 3, 4],
..AudioPacket::default()
}) })
.await .await
.expect("send anchor audio packet"); .expect("send anchor audio packet");

View File

@ -106,6 +106,7 @@ mod server_upstream_media_pairing {
id: 0, id: 0,
pts: 12_345, pts: 12_345,
data: vec![1, 2, 3, 4, 5, 6], data: vec![1, 2, 3, 4, 5, 6],
..AudioPacket::default()
}) })
.await .await
.expect("send stale synthetic upstream audio"); .expect("send stale synthetic upstream audio");
@ -170,6 +171,7 @@ mod server_upstream_media_pairing {
id: 0, id: 0,
pts: 1_000_000, pts: 1_000_000,
data: vec![1, 2, 3, 4], data: vec![1, 2, 3, 4],
..AudioPacket::default()
}) })
.await .await
.expect("send first audio packet"); .expect("send first audio packet");