media: clamp future capture pts and live-switch devices

This commit is contained in:
Brad Stein 2026-05-02 10:31:22 -03:00
parent db83f24dde
commit 609517de03
20 changed files with 632 additions and 120 deletions

View File

@ -154,7 +154,7 @@ Context: the mirrored browser probe finally reproduced the real failure class on
- [ ] Surface `Starting`, `Healing`, `Flowing`, `Lagging`, `Dropping`, and `Stale` states in chips/diagnostics from real path evidence.
### Phase 4: Recovery And Mid-Session Changes
- [ ] Make device changes trigger soft-pause, stream replacement, queue flush, and re-pairing.
- [x] Make device changes trigger soft-pause, stream replacement, queue flush, and re-pairing.
- [ ] Keep recovery soft-first; reserve hard UVC/UAC gadget rebuilds for explicit guarded recoveries.
- [ ] Add cooldown/state guards so recovery buttons cannot wedge Theia.
- [ ] Ensure disconnect closes all client/server media tasks for the session.
@ -229,3 +229,5 @@ Context: 0.16.x proved that queue tweaks and static calibration cannot guarantee
- 2026-05-02: 0.17.3 Google Meet manual test improved to roughly sub-second/near-quarter-second lip sync, but the mirrored analyzer could not pair pulses and the user still heard choppy background audio. Client logs showed Pulse microphone packets arriving unevenly with ages around `90-240ms`; patch 0.17.4 lowers Pulse mic `buffer-time`/`latency-time`, bounds the mic queue/appsink, and keeps mirrored-probe after-run planner diagnostics even when analysis fails.
- 2026-05-02: 0.17.4 mirrored run was salvageable after an SCP banner timeout, but analysis still failed with no close pulse pairs. The client log still showed `180-240ms` microphone delivery ages, pointing at server playout sleeps backpressuring the gRPC microphone stream. Patch 0.17.5 drains inbound microphone packets while waiting for scheduled UAC playout and retries browser-capture SCP fetches.
- 2026-05-02: 0.17.5 mirrored run still failed with insufficient paired evidence, and the client log still showed recurring `180-240ms` microphone packet age while camera age stayed near zero. Patch 0.17.6 splits oversized mic samples into `20ms` timestamped packets and keeps a short fresh server-side audio window instead of collapsing every pending burst to one newest chunk, aiming to preserve lip sync without making background audio choppy.
- 2026-05-02: 0.17.6 Bumblebee mirrored run proved Bumblebee mic packets are already `10ms`, but camera source timestamps were being rebased up to roughly `1.8s` into the future while mic packets sat around `180-240ms` old. Patch 0.17.7 adds a source lead cap (`80ms` default) to both direct and duration-paced client timestamp rebasing so bursty camera buffers cannot make the server wait for fake future video while fresh audio keeps moving.
- 2026-05-02: The launcher UI was still writing live control files with only camera/mic/speaker booleans, so media device combo changes were honestly only staged for the next child launch. Patch 0.17.7 extends the live media control file with base64-encoded camera source, camera profile, microphone source, and speaker sink choices; the relay child now rebuilds the affected camera, mic, or speaker pipeline when those selections change.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.17.6"
version = "0.17.7"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.17.6"
version = "0.17.7"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.17.6"
version = "0.17.7"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.17.6"
version = "0.17.7"
edition = "2024"
[dependencies]

View File

@ -70,7 +70,6 @@ impl LesavkaClientApp {
#[cfg(not(coverage))]
async fn audio_loop(
ep: Channel,
out: AudioOut,
media_controls: crate::live_media_control::LiveMediaControls,
) {
let mut consecutive_source_failures = 0_u32;
@ -92,6 +91,24 @@ impl LesavkaClientApp {
paused = false;
delay = Duration::from_secs(1);
}
let audio_sink_choice = media_controls.refresh().audio_sink;
let active_sink = audio_sink_choice.resolve(None);
let out = match audio_sink_choice {
crate::live_media_control::MediaDeviceChoice::Auto => AudioOut::new_default_sink(),
crate::live_media_control::MediaDeviceChoice::Inherit => AudioOut::new(),
crate::live_media_control::MediaDeviceChoice::Selected(ref sink) => {
AudioOut::new_with_sink(Some(sink))
}
};
let out = match out {
Ok(out) => out,
Err(err) => {
audio_failure_log.record("sink", &err.to_string());
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
id: 0,
@ -108,10 +125,20 @@ impl LesavkaClientApp {
let mut warned_no_packets = false;
delay = Duration::from_secs(1);
loop {
if !media_controls.refresh().audio {
let state = media_controls.refresh();
if !state.audio {
tracing::info!("🔇 remote audio soft-paused; closing capture stream");
break;
}
let desired_sink = state.audio_sink.resolve(None);
if desired_sink != active_sink {
tracing::info!(
from = active_sink.as_deref().unwrap_or("auto"),
to = desired_sink.as_deref().unwrap_or("auto"),
"🔊 speaker sink changed; restarting live audio output pipeline"
);
break;
}
match tokio::time::timeout(
Duration::from_secs(1),
stream.get_mut().message(),

View File

@ -58,11 +58,19 @@ impl LesavkaClientApp {
let caps = handshake::negotiate(&self.server_addr).await;
tracing::info!("🤝 server capabilities = {:?}", caps);
let camera_cfg = app_support::camera_config_from_caps(&caps);
let initial_cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok();
let initial_cam_profile = initial_camera_profile_id_from_env();
let initial_mic_source = std::env::var("LESAVKA_MIC_SOURCE").ok();
let initial_audio_sink = std::env::var("LESAVKA_AUDIO_SINK").ok();
let media_controls = crate::live_media_control::LiveMediaControls::from_env(
crate::live_media_control::MediaControlState::new(
crate::live_media_control::MediaControlState::with_devices(
caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(),
caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(),
std::env::var("LESAVKA_AUDIO_DISABLE").is_err(),
initial_cam_source.clone(),
initial_cam_profile.clone(),
initial_mic_source.clone(),
initial_audio_sink.clone(),
),
);
let media_state = media_controls.refresh();
@ -213,13 +221,8 @@ impl LesavkaClientApp {
/*────────── audio renderer & puller ───────────*/
if std::env::var("LESAVKA_AUDIO_DISABLE").is_err() {
let audio_out = AudioOut::new()?;
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(
ep_audio,
audio_out,
media_controls.clone(),
));
tokio::spawn(Self::audio_loop(ep_audio, media_controls.clone()));
} else {
info!("🔇 remote audio disabled for this relay session");
}
@ -238,80 +241,30 @@ impl LesavkaClientApp {
);
}
let ep = vid_ep.clone();
let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok();
let cam_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera);
let media_controls = media_controls.clone();
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(move || {
CameraCapture::new(cam_source.as_deref(), camera_cfg)
})
.await;
match result {
Ok(Ok(cam)) => {
let cam = Arc::new(cam);
tokio::spawn(Self::cam_loop(
ep,
cam,
cam_telemetry.clone(),
media_controls.clone(),
initial_cam_source.clone(),
initial_cam_profile.clone(),
camera_cfg,
cam_telemetry,
media_controls,
));
}
Ok(Err(err)) => {
cam_telemetry.record_disconnect(format!(
"webcam uplink setup failed: {err:#}"
));
warn!(
"📸 webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}"
);
}
Err(err) => {
cam_telemetry.record_disconnect(format!(
"webcam uplink setup task failed: {err}"
));
warn!(
"📸 webcam uplink setup task failed before StreamCamera could start: {err}"
);
}
}
});
}
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() {
let ep = vid_ep.clone();
let mic_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone);
let media_controls = media_controls.clone();
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await;
match result {
Ok(Ok(mic)) => {
let mic = Arc::new(mic);
tokio::spawn(Self::voice_loop(
ep,
mic,
mic_telemetry.clone(),
media_controls.clone(),
initial_mic_source.clone(),
mic_telemetry,
media_controls,
));
}
Ok(Err(err)) => {
mic_telemetry.record_disconnect(format!(
"microphone uplink setup failed: {err:#}"
));
warn!(
"🎤 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}"
);
}
Err(err) => {
mic_telemetry.record_disconnect(format!(
"microphone uplink setup task failed: {err}"
));
warn!(
"🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}"
);
}
}
});
}
/*────────── central reactor ───────────────────*/
if self.headless {

View File

@ -3,7 +3,7 @@ impl LesavkaClientApp {
#[cfg(not(coverage))]
async fn voice_loop(
ep: Channel,
mic: Arc<MicrophoneCapture>,
initial_source: Option<String>,
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls,
) {
@ -11,6 +11,48 @@ impl LesavkaClientApp {
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let state = media_controls.refresh();
if !state.microphone {
telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let microphone_source_choice = state.microphone_source.clone();
let active_source = microphone_source_choice.resolve(initial_source.as_deref());
let use_default_source = matches!(
microphone_source_choice,
crate::live_media_control::MediaDeviceChoice::Auto
) && active_source.is_none();
let setup_source = active_source.clone();
let result = tokio::task::spawn_blocking(move || {
if use_default_source {
MicrophoneCapture::new_default_source()
} else {
MicrophoneCapture::new_with_source(setup_source.as_deref())
}
})
.await;
let mic = match result {
Ok(Ok(mic)) => Arc::new(mic),
Ok(Err(err)) => {
telemetry.record_disconnect(format!("microphone uplink setup failed: {err:#}"));
warn!(
"🎤 microphone uplink setup failed for {:?}: {err:#}",
active_source.as_deref().unwrap_or("auto")
);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
Err(err) => {
telemetry.record_disconnect(format!("microphone uplink setup task failed: {err}"));
warn!("🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}");
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE);
@ -55,10 +97,24 @@ impl LesavkaClientApp {
let queue_thread = queue.clone();
let drop_log_thread = Arc::clone(&drop_log);
let media_controls_thread = media_controls.clone();
let initial_source_thread = initial_source.clone();
let active_source_thread = active_source.clone();
let mic_worker = std::thread::spawn(move || {
let mut paused = false;
while stop_rx.try_recv().is_err() {
if !media_controls_thread.refresh().microphone {
let state = media_controls_thread.refresh();
let desired_source = state
.microphone_source
.resolve(initial_source_thread.as_deref());
if desired_source != active_source_thread {
tracing::info!(
from = active_source_thread.as_deref().unwrap_or("auto"),
to = desired_source.as_deref().unwrap_or("auto"),
"🎤 microphone source changed; restarting live uplink pipeline"
);
break;
}
if !state.microphone {
if !paused {
telemetry_thread.record_enabled(false);
tracing::info!("🎤 microphone uplink soft-paused");
@ -123,13 +179,56 @@ impl LesavkaClientApp {
#[cfg(not(coverage))]
async fn cam_loop(
ep: Channel,
cam: Arc<CameraCapture>,
initial_source: Option<String>,
initial_profile: Option<String>,
camera_cfg: Option<crate::input::camera::CameraConfig>,
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls,
) {
let mut delay = Duration::from_secs(1);
loop {
let state = media_controls.refresh();
if !state.camera {
telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let active_source = state.camera_source.resolve(initial_source.as_deref());
let active_profile = state.camera_profile.resolve(initial_profile.as_deref());
let capture_profile = active_profile
.as_deref()
.and_then(parse_camera_profile_id);
let setup_source = active_source.clone();
let result = tokio::task::spawn_blocking(move || {
CameraCapture::new_with_capture_profile(
setup_source.as_deref(),
camera_cfg,
capture_profile,
)
})
.await;
let cam = match result {
Ok(Ok(cam)) => Arc::new(cam),
Ok(Err(err)) => {
telemetry.record_disconnect(format!("webcam uplink setup failed: {err:#}"));
warn!(
"📸 webcam uplink setup failed for {:?}: {err:#}",
active_source.as_deref().unwrap_or("auto")
);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
Err(err) => {
telemetry.record_disconnect(format!("webcam uplink setup task failed: {err}"));
warn!("📸 webcam uplink setup task failed before StreamCamera could start: {err}");
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE);
@ -173,21 +272,68 @@ impl LesavkaClientApp {
let queue = queue.clone();
let drop_log = Arc::clone(&drop_log);
let media_controls = media_controls.clone();
let initial_source_thread = initial_source.clone();
let active_source_thread = active_source.clone();
let initial_profile_thread = initial_profile.clone();
let active_profile_thread = active_profile.clone();
move || loop {
if stop_rx.try_recv().is_ok() {
break;
}
if !media_controls.refresh().camera {
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_source_thread.as_deref());
let desired_profile =
state.camera_profile.resolve(initial_profile_thread.as_deref());
if desired_source != active_source_thread
|| desired_profile != active_profile_thread
{
tracing::info!(
from = active_source_thread.as_deref().unwrap_or("auto"),
to = desired_source.as_deref().unwrap_or("auto"),
"📸 webcam source changed; restarting live uplink pipeline"
);
break;
}
if !state.camera {
telemetry.record_enabled(false);
tracing::info!("📸 webcam uplink soft-paused");
while stop_rx.try_recv().is_err()
&& !media_controls.refresh().camera
while stop_rx.try_recv().is_err() {
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_source_thread.as_deref());
let desired_profile = state
.camera_profile
.resolve(initial_profile_thread.as_deref());
if desired_source != active_source_thread
|| desired_profile != active_profile_thread
{
break;
}
if state.camera {
break;
}
std::thread::sleep(Duration::from_millis(25));
}
if stop_rx.try_recv().is_ok() {
break;
}
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_source_thread.as_deref());
let desired_profile = state
.camera_profile
.resolve(initial_profile_thread.as_deref());
if desired_source != active_source_thread
|| desired_profile != active_profile_thread
{
tracing::info!(
from = active_source_thread.as_deref().unwrap_or("auto"),
to = desired_source.as_deref().unwrap_or("auto"),
"📸 webcam source changed while paused; restarting live uplink pipeline"
);
break;
}
telemetry.record_enabled(true);
tracing::info!("📸 webcam uplink resumed");
}
@ -248,6 +394,24 @@ impl LesavkaClientApp {
}
}
#[cfg(not(coverage))]
fn initial_camera_profile_id_from_env() -> Option<String> {
let width = std::env::var("LESAVKA_CAM_WIDTH").ok()?;
let height = std::env::var("LESAVKA_CAM_HEIGHT").ok()?;
let fps = std::env::var("LESAVKA_CAM_FPS").ok()?;
Some(format!("{width}x{height}@{fps}"))
}
#[cfg(not(coverage))]
fn parse_camera_profile_id(raw: &str) -> Option<(u32, u32, u32)> {
let (size, fps) = raw.split_once('@')?;
let (width, height) = size.split_once('x')?;
let width = width.parse().ok()?;
let height = height.parse().ok()?;
let fps = fps.parse().ok()?;
(width > 0 && height > 0 && fps > 0).then_some((width, height, fps))
}
#[cfg(not(coverage))]
const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig {

View File

@ -1,5 +1,13 @@
impl CameraCapture {
pub fn new(device_fragment: Option<&str>, cfg: Option<CameraConfig>) -> anyhow::Result<Self> {
Self::new_with_capture_profile(device_fragment, cfg, None)
}
pub fn new_with_capture_profile(
device_fragment: Option<&str>,
cfg: Option<CameraConfig>,
capture_profile_override: Option<(u32, u32, u32)>,
) -> anyhow::Result<Self> {
gst::init().ok();
// Select source: V4L2 device or test pattern
@ -41,7 +49,7 @@ impl CameraCapture {
|cfg| matches!(cfg.codec, CameraCodec::Mjpeg),
);
let jpeg_quality = env_u32("LESAVKA_CAM_JPEG_QUALITY", 85).clamp(1, 100);
let capture_profile = resolved_capture_profile(cfg);
let capture_profile = capture_profile_override.unwrap_or_else(|| resolved_capture_profile(cfg));
let (capture_width, capture_height, capture_fps) = capture_profile;
let (width, height, fps) = resolved_output_profile(cfg, capture_profile);
let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps);
@ -328,8 +336,9 @@ fn log_camera_timing_sample(
pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128,
used_source_pts = timing.used_source_pts,
lag_clamped = timing.lag_clamped,
lead_clamped = timing.lead_clamped,
bytes,
"📸 upstream webcam timing sample"
);
}
}
}

View File

@ -48,11 +48,31 @@ pub struct MicrophoneCapture {
impl MicrophoneCapture {
pub fn new() -> Result<Self> {
Self::new_with_source_and_env(None, true)
}
pub fn new_with_source(source_override: Option<&str>) -> Result<Self> {
Self::new_with_source_and_env(source_override, true)
}
pub fn new_default_source() -> Result<Self> {
Self::new_with_source_and_env(None, false)
}
fn new_with_source_and_env(
source_override: Option<&str>,
allow_env_source: bool,
) -> Result<Self> {
gst::init().ok(); // idempotent
/* preferred path: pipewiresrc; fallback: pulsesrc ----------------*/
let source_desc = match std::env::var("LESAVKA_MIC_SOURCE") {
Ok(s) if !s.is_empty() => match Self::resolve_source_desc(&s) {
let selected_source = source_override.map(str::to_string).or_else(|| {
allow_env_source
.then(|| std::env::var("LESAVKA_MIC_SOURCE").ok())
.flatten()
});
let source_desc = match selected_source {
Some(s) if !s.is_empty() => match Self::resolve_source_desc(&s) {
Some(desc) => desc,
None => {
warn!("🎤 requested mic '{s}' not found; using default");
@ -166,6 +186,7 @@ impl MicrophoneCapture {
timing.capture_now_us as i128 - timing.packet_pts_us as i128,
used_source_pts = timing.used_source_pts,
lag_clamped = timing.lag_clamped,
lead_clamped = timing.lead_clamped,
bytes = map.len(),
packet_duration_us,
split_packets = packet_count,

View File

@ -569,12 +569,22 @@ fn write_media_control_request_formats_soft_pause_state() {
state.set_camera_channel_enabled(true);
state.set_microphone_channel_enabled(false);
state.set_audio_channel_enabled(true);
state.select_camera(Some("Logitech BRIO".to_string()));
state.select_camera_quality(Some(CameraMode::new(1280, 720, 30)));
state.select_microphone(Some(
"alsa_input.usb-Neat Microphones Bumblebee".to_string(),
));
state.select_speaker(Some("bluez_output.80_C3_BA_76_26_AB.1".to_string()));
write_media_control_request(&path, &state).expect("write media control");
let raw = std::fs::read_to_string(path).expect("read media control");
assert!(raw.contains("camera=1"), "{raw}");
assert!(raw.contains("microphone=0"), "{raw}");
assert!(raw.contains("audio=1"), "{raw}");
assert!(raw.contains("camera_source=b64:"), "{raw}");
assert!(raw.contains("camera_profile=b64:"), "{raw}");
assert!(raw.contains("microphone_source=b64:"), "{raw}");
assert!(raw.contains("audio_sink=b64:"), "{raw}");
}
#[gtk::test]

View File

@ -135,6 +135,31 @@ fn apply_media_control_change(
}
}
#[cfg(not(coverage))]
/// Apply a live media device-selection change by asking the relay child to rebuild that pipeline.
fn apply_live_media_device_change(
state_snapshot: &LauncherState,
widgets: &super::ui_components::LauncherWidgets,
child_proc: &Rc<RefCell<Option<RelayChild>>>,
feed_label: &str,
) {
let relay_live = child_proc
.try_borrow()
.map(|child| child.is_some())
.unwrap_or(false);
if relay_live {
let path = media_control_path();
match write_media_control_request(&path, state_snapshot) {
Ok(()) => widgets.status_label.set_text(&format!(
"{feed_label} selection applied to the live relay; the stream is restarting."
)),
Err(err) => widgets.status_label.set_text(&format!(
"{feed_label} selection is staged for the next relay launch, but live device control could not be written: {err}"
)),
}
}
}
#[cfg(not(coverage))]
/// Refresh relay capture-power state in the background so GTK stays responsive.
fn request_capture_power_refresh(

View File

@ -12,8 +12,11 @@
.select_microphone(selected_combo_value(&microphone_combo_read));
let relay_live = child_proc.borrow().is_some();
if relay_live {
widgets.status_label.set_text(
"Microphone selection staged for the next relay launch. Use the Mic toggle to soft-pause or resume the current live feed.",
apply_live_media_device_change(
&state.borrow(),
&widgets,
&child_proc,
"Microphone",
);
} else if tests.borrow_mut().is_running(DeviceTestKind::Microphone) {
widgets.status_label.set_text(
@ -41,8 +44,11 @@
tests.borrow_mut().is_running(DeviceTestKind::Microphone);
let relay_live = child_proc.borrow().is_some();
if relay_live {
widgets.status_label.set_text(
"Speaker selection staged for the next relay launch. Speaker gain still applies live.",
apply_live_media_device_change(
&state.borrow(),
&widgets,
&child_proc,
"Speaker",
);
} else if speaker_running || microphone_running {
widgets.status_label.set_text(

View File

@ -31,8 +31,11 @@
.status_label
.set_text(&format!("Camera quality update failed: {err}"));
} else if child_proc.borrow().is_some() {
widgets.status_label.set_text(
"Camera selection staged for the next relay launch. Use the Camera toggle to soft-pause or resume the current live feed.",
apply_live_media_device_change(
&state.borrow(),
&widgets,
&child_proc,
"Camera",
);
} else if preview_was_running {
widgets.status_label.set_text(&format!(
@ -73,8 +76,11 @@
.status_label
.set_text(&format!("Camera quality update failed: {err}"));
} else if child_proc.borrow().is_some() {
widgets.status_label.set_text(
"Camera quality staged for the next relay launch. The live feed keeps its current capture pipeline.",
apply_live_media_device_change(
&state.borrow(),
&widgets,
&child_proc,
"Camera quality",
);
} else if preview_was_running {
widgets.status_label.set_text(&format!(

View File

@ -135,10 +135,14 @@ pub fn write_mic_gain_request(path: &Path, gain_percent: u32) -> Result<()> {
pub fn write_media_control_request(path: &Path, state: &LauncherState) -> Result<()> {
crate::live_media_control::write_media_control_request(
path,
crate::live_media_control::MediaControlState::new(
crate::live_media_control::MediaControlState::with_devices(
state.channels.camera,
state.channels.microphone,
state.channels.audio,
state.devices.camera.clone(),
state.camera_quality.map(|mode| mode.id()),
state.devices.microphone.clone(),
state.devices.speaker.clone(),
),
)?;
Ok(())

View File

@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
static CAPTURE_ORIGIN: OnceLock<Instant> = OnceLock::new();
const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250;
const DEFAULT_SOURCE_LEAD_CAP_MS: u64 = 80;
fn origin() -> Instant {
*CAPTURE_ORIGIN.get_or_init(Instant::now)
@ -71,6 +72,22 @@ pub fn upstream_source_lag_cap() -> Duration {
.unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS))
}
/// Cap how far source-derived packet timestamps may lead the live capture clock.
///
/// Inputs: none.
/// Outputs: the maximum tolerated future lead for source-based packet PTS.
/// Why: live sources can flush a burst of future-stamped buffers; if those
/// future timestamps escape, the server freezes media waiting for local backlog.
#[must_use]
pub fn upstream_source_lead_cap() -> Duration {
std::env::var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS")
.ok()
.and_then(|raw| raw.trim().parse::<u64>().ok())
.filter(|value| *value > 0)
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LEAD_CAP_MS))
}
#[derive(Debug, Default)]
struct SourcePtsRebaserState {
source_base_us: Option<u64>,
@ -102,11 +119,13 @@ pub struct RebasedSourcePts {
pub capture_base_us: Option<u64>,
pub used_source_pts: bool,
pub lag_clamped: bool,
pub lead_clamped: bool,
}
#[derive(Debug, Default)]
struct DurationPacedSourcePtsState {
next_packet_pts_us: Option<u64>,
last_packet_pts_us: Option<u64>,
}
/// Rebase encoded packet timing by anchoring once, then pacing by duration.
@ -162,6 +181,7 @@ impl SourcePtsRebaser {
let mut packet_pts_us = capture_now_us;
let mut used_source_pts = false;
let mut lag_clamped = false;
let mut lead_clamped = false;
if let Some(source_pts_us) = source_pts_us {
let source_base_us = *state.source_base_us.get_or_insert(source_pts_us);
@ -179,6 +199,14 @@ impl SourcePtsRebaser {
packet_pts_us = lag_floor_us;
lag_clamped = true;
}
let lead_ceiling_us =
capture_now_us.saturating_add(
upstream_source_lead_cap().as_micros().min(u64::MAX as u128) as u64,
);
if packet_pts_us > lead_ceiling_us {
packet_pts_us = lead_ceiling_us;
lead_clamped = true;
}
}
if let Some(last_packet_pts_us) = state.last_packet_pts_us
@ -196,6 +224,7 @@ impl SourcePtsRebaser {
capture_base_us: state.capture_base_us,
used_source_pts,
lag_clamped,
lead_clamped,
}
}
}
@ -232,6 +261,19 @@ impl DurationPacedSourcePtsRebaser {
packet_pts_us = lag_floor_us;
rebased.lag_clamped = true;
}
let lead_ceiling_us = rebased
.capture_now_us
.saturating_add(upstream_source_lead_cap().as_micros().min(u64::MAX as u128) as u64);
if packet_pts_us > lead_ceiling_us {
packet_pts_us = lead_ceiling_us;
rebased.lead_clamped = true;
}
if let Some(last_packet_pts_us) = state.last_packet_pts_us
&& packet_pts_us <= last_packet_pts_us
{
packet_pts_us = last_packet_pts_us.saturating_add(1);
}
state.last_packet_pts_us = Some(packet_pts_us);
state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us));
rebased.packet_pts_us = packet_pts_us;
rebased
@ -242,7 +284,7 @@ impl DurationPacedSourcePtsRebaser {
mod tests {
use super::{
DurationPacedSourcePtsRebaser, SourcePtsRebaser, capture_pts_us, packet_age,
upstream_source_lag_cap, upstream_timing_trace_enabled,
upstream_source_lag_cap, upstream_source_lead_cap, upstream_timing_trace_enabled,
};
use serial_test::serial;
use std::time::Duration;
@ -306,6 +348,8 @@ mod tests {
assert!(second.packet_pts_us > first.packet_pts_us);
assert!(!first.lag_clamped);
assert!(!second.lag_clamped);
assert!(!first.lead_clamped);
assert!(!second.lead_clamped);
}
#[test]
@ -323,6 +367,23 @@ mod tests {
assert!(second.capture_now_us - second.packet_pts_us <= 2_500);
}
#[test]
#[serial]
fn source_pts_rebaser_clamps_source_lead_when_it_runs_too_far_ahead() {
temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", Some("5"), || {
let rebased = SourcePtsRebaser::default();
let _first =
rebased.rebase_with_lag_cap(Some(1_000_000), 1, Some(Duration::from_millis(250)));
let second =
rebased.rebase_with_lag_cap(Some(2_000_000), 1, Some(Duration::from_millis(250)));
assert!(second.used_source_pts);
assert!(second.lead_clamped);
assert!(second.packet_pts_us >= second.capture_now_us);
assert!(second.packet_pts_us <= second.capture_now_us + 5_500);
});
}
#[test]
#[serial]
fn source_pts_rebasers_anchor_each_stream_to_its_own_first_packet_time() {
@ -373,6 +434,18 @@ mod tests {
});
}
#[test]
#[serial]
fn upstream_source_lead_cap_defaults_and_accepts_override() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", || {
assert_eq!(upstream_source_lead_cap(), Duration::from_millis(80));
});
temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", Some("35"), || {
assert_eq!(upstream_source_lead_cap(), Duration::from_millis(35));
});
}
#[test]
#[serial]
fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() {
@ -402,4 +475,24 @@ mod tests {
"duration-paced packet pts should never trail live capture by more than the lag cap"
);
}
#[test]
#[serial]
fn duration_paced_rebaser_clamps_when_duration_pacing_runs_future() {
temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS", Some("15"), || {
let rebased = DurationPacedSourcePtsRebaser::default();
let mut last =
rebased.rebase_with_packet_duration(Some(0), 50_000, Duration::from_millis(250));
for packet_index in 1..12 {
last = rebased.rebase_with_packet_duration(
Some(packet_index * 50_000),
50_000,
Duration::from_millis(250),
);
}
assert!(last.lead_clamped);
assert!(last.packet_pts_us <= last.capture_now_us + 16_000);
});
}
}

View File

@ -7,23 +7,80 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use base64::{Engine as _, engine::general_purpose::STANDARD as B64};
pub const MEDIA_CONTROL_ENV: &str = "LESAVKA_MEDIA_CONTROL";
pub const DEFAULT_MEDIA_CONTROL_PATH: &str = "/tmp/lesavka-media.control";
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum MediaDeviceChoice {
Inherit,
Auto,
Selected(String),
}
impl MediaDeviceChoice {
#[must_use]
pub fn from_selection(selection: Option<String>) -> Self {
selection
.filter(|value| !value.trim().is_empty())
.map(Self::Selected)
.unwrap_or(Self::Auto)
}
#[must_use]
pub fn resolve(&self, fallback: Option<&str>) -> Option<String> {
match self {
Self::Inherit => fallback.map(str::to_string),
Self::Auto => None,
Self::Selected(value) => Some(value.clone()),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct MediaControlState {
pub camera: bool,
pub microphone: bool,
pub audio: bool,
pub camera_source: MediaDeviceChoice,
pub camera_profile: MediaDeviceChoice,
pub microphone_source: MediaDeviceChoice,
pub audio_sink: MediaDeviceChoice,
}
impl MediaControlState {
#[must_use]
pub const fn new(camera: bool, microphone: bool, audio: bool) -> Self {
pub fn new(camera: bool, microphone: bool, audio: bool) -> Self {
Self {
camera,
microphone,
audio,
camera_source: MediaDeviceChoice::Inherit,
camera_profile: MediaDeviceChoice::Inherit,
microphone_source: MediaDeviceChoice::Inherit,
audio_sink: MediaDeviceChoice::Inherit,
}
}
#[must_use]
pub fn with_devices(
camera: bool,
microphone: bool,
audio: bool,
camera_source: Option<String>,
camera_profile: Option<String>,
microphone_source: Option<String>,
audio_sink: Option<String>,
) -> Self {
Self {
camera,
microphone,
audio,
camera_source: MediaDeviceChoice::from_selection(camera_source),
camera_profile: MediaDeviceChoice::from_selection(camera_profile),
microphone_source: MediaDeviceChoice::from_selection(microphone_source),
audio_sink: MediaDeviceChoice::from_selection(audio_sink),
}
}
}
@ -63,11 +120,11 @@ impl LiveMediaControls {
{
inner.state = state;
}
inner.state
inner.state.clone()
}
}
/// Writes one atomic-ish soft-pause request for the running relay child to poll.
/// Writes one atomic-ish soft-pause/device request for the running relay child to poll.
pub(crate) fn write_media_control_request(
path: &Path,
state: MediaControlState,
@ -75,11 +132,15 @@ pub(crate) fn write_media_control_request(
fs::write(
path,
format!(
"camera={} microphone={} audio={} {}\n",
"camera={} microphone={} audio={} camera_source={} camera_profile={} microphone_source={} audio_sink={} nonce={}\n",
bool_flag(state.camera),
bool_flag(state.microphone),
bool_flag(state.audio),
control_request_nonce()
encode_choice(&state.camera_source),
encode_choice(&state.camera_profile),
encode_choice(&state.microphone_source),
encode_choice(&state.audio_sink),
control_request_nonce(),
),
)
}
@ -89,6 +150,10 @@ fn parse_media_control_state(raw: &str) -> Option<MediaControlState> {
let mut camera = None;
let mut microphone = None;
let mut audio = None;
let mut camera_source = MediaDeviceChoice::Inherit;
let mut camera_profile = MediaDeviceChoice::Inherit;
let mut microphone_source = MediaDeviceChoice::Inherit;
let mut audio_sink = MediaDeviceChoice::Inherit;
for token in raw.split_ascii_whitespace() {
let Some((key, value)) = token.split_once('=') else {
continue;
@ -97,6 +162,12 @@ fn parse_media_control_state(raw: &str) -> Option<MediaControlState> {
"camera" => camera = Some(parse_bool_flag(value)?),
"microphone" | "mic" => microphone = Some(parse_bool_flag(value)?),
"audio" | "speaker" => audio = Some(parse_bool_flag(value)?),
"camera_source" | "camera_source_b64" => camera_source = parse_choice(value)?,
"camera_profile" | "camera_quality" => camera_profile = parse_choice(value)?,
"microphone_source" | "mic_source" | "microphone_source_b64" => {
microphone_source = parse_choice(value)?;
}
"audio_sink" | "speaker_sink" | "audio_sink_b64" => audio_sink = parse_choice(value)?,
_ => {}
}
}
@ -104,9 +175,37 @@ fn parse_media_control_state(raw: &str) -> Option<MediaControlState> {
camera: camera?,
microphone: microphone?,
audio: audio?,
camera_source,
camera_profile,
microphone_source,
audio_sink,
})
}
fn encode_choice(choice: &MediaDeviceChoice) -> String {
match choice {
MediaDeviceChoice::Inherit => "inherit".to_string(),
MediaDeviceChoice::Auto => "auto".to_string(),
MediaDeviceChoice::Selected(value) => format!("b64:{}", B64.encode(value.as_bytes())),
}
}
fn parse_choice(value: &str) -> Option<MediaDeviceChoice> {
let value = value.trim();
if value.is_empty() || value.eq_ignore_ascii_case("auto") {
return Some(MediaDeviceChoice::Auto);
}
if value.eq_ignore_ascii_case("inherit") {
return Some(MediaDeviceChoice::Inherit);
}
if let Some(encoded) = value.strip_prefix("b64:") {
let decoded = B64.decode(encoded).ok()?;
let decoded = String::from_utf8(decoded).ok()?;
return Some(MediaDeviceChoice::from_selection(Some(decoded)));
}
Some(MediaDeviceChoice::from_selection(Some(value.to_string())))
}
fn parse_bool_flag(value: &str) -> Option<bool> {
match value.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "on" | "yes" => Some(true),
@ -138,6 +237,37 @@ mod tests {
);
}
#[test]
fn parses_media_control_state_with_live_device_choices() {
let state = MediaControlState::with_devices(
true,
true,
true,
Some("Logitech BRIO".to_string()),
Some("1280x720@30".to_string()),
Some("alsa_input.usb-Neat Microphones".to_string()),
None,
);
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("media.control");
write_media_control_request(&path, state.clone()).expect("write controls");
let raw = fs::read_to_string(path).expect("read controls");
assert_eq!(parse_media_control_state(&raw), Some(state));
}
#[test]
fn device_choices_resolve_inherit_auto_and_selected() {
assert_eq!(
MediaDeviceChoice::Inherit.resolve(Some("env-device")),
Some("env-device".to_string())
);
assert_eq!(MediaDeviceChoice::Auto.resolve(Some("env-device")), None);
assert_eq!(
MediaDeviceChoice::Selected("chosen".to_string()).resolve(Some("env-device")),
Some("chosen".to_string())
);
}
#[test]
fn live_media_controls_refresh_after_file_changes() {
let dir = tempfile::tempdir().expect("tempdir");

View File

@ -35,8 +35,23 @@ struct AudioTimeline {
impl AudioOut {
pub fn new() -> anyhow::Result<Self> {
Self::new_with_sink_and_env(None, true)
}
pub fn new_with_sink(sink_override: Option<&str>) -> anyhow::Result<Self> {
Self::new_with_sink_and_env(sink_override, true)
}
pub fn new_default_sink() -> anyhow::Result<Self> {
Self::new_with_sink_and_env(None, false)
}
fn new_with_sink_and_env(
sink_override: Option<&str>,
allow_env_sink: bool,
) -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
let sink = pick_sink_element()?;
let sink = pick_sink_element(sink_override, allow_env_sink)?;
let tee_dump = std::env::var("LESAVKA_TAP_AUDIO")
.ok()
.as_deref()
@ -261,8 +276,13 @@ impl Drop for AudioOut {
}
#[cfg(not(coverage))]
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
fn pick_sink_element(sink_override: Option<&str>, allow_env_sink: bool) -> Result<String> {
if let Some(s) = sink_override.filter(|value| !value.trim().is_empty()) {
let sink = normalize_sink_override(s);
info!("💪 sink overridden via live media control={s} -> {sink}");
return Ok(sink);
}
if allow_env_sink && let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
let sink = normalize_sink_override(&s);
info!(
"💪 sink overridden via LESAVKA_AUDIO_SINK={} -> {}",
@ -280,8 +300,11 @@ fn pick_sink_element() -> Result<String> {
}
#[cfg(coverage)]
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
fn pick_sink_element(sink_override: Option<&str>, allow_env_sink: bool) -> Result<String> {
if let Some(s) = sink_override.filter(|value| !value.trim().is_empty()) {
return Ok(normalize_sink_override(s));
}
if allow_env_sink && let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
return Ok(normalize_sink_override(&s));
}
if let Some((n, _)) = list_pw_sinks().first() {

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.17.6"
version = "0.17.7"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.17.6"
version = "0.17.7"
edition = "2024"
autobins = false

View File

@ -465,6 +465,7 @@ mod camera_include_contract {
capture_base_us: Some(7_345),
used_source_pts: true,
lag_clamped: false,
lead_clamped: false,
},
256,
);

View File

@ -10,6 +10,7 @@ use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250;
const DEFAULT_SOURCE_LEAD_CAP_MS: u64 = 80;
fn capture_clock_origin() -> &'static Instant {
static ORIGIN: OnceLock<Instant> = OnceLock::new();
@ -46,6 +47,15 @@ pub fn upstream_source_lag_cap() -> Duration {
.unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS))
}
pub fn upstream_source_lead_cap() -> Duration {
std::env::var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS")
.ok()
.and_then(|raw| raw.trim().parse::<u64>().ok())
.filter(|value| *value > 0)
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LEAD_CAP_MS))
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct RebasedSourcePts {
pub packet_pts_us: u64,
@ -55,6 +65,7 @@ pub struct RebasedSourcePts {
pub capture_base_us: Option<u64>,
pub used_source_pts: bool,
pub lag_clamped: bool,
pub lead_clamped: bool,
}
#[derive(Debug, Default)]
@ -72,6 +83,7 @@ pub struct SourcePtsRebaser {
#[derive(Debug, Default)]
struct DurationPacedSourcePtsState {
next_packet_pts_us: Option<u64>,
last_packet_pts_us: Option<u64>,
}
#[derive(Debug, Default)]
@ -99,6 +111,7 @@ impl SourcePtsRebaser {
let mut packet_pts_us = capture_now_us;
let mut used_source_pts = false;
let mut lag_clamped = false;
let mut lead_clamped = false;
if let Some(source_pts_us) = source_pts_us {
let source_base_us = *state.source_base_us.get_or_insert(source_pts_us);
@ -115,6 +128,15 @@ impl SourcePtsRebaser {
packet_pts_us = lag_floor_us;
lag_clamped = true;
}
let lead_ceiling_us = capture_now_us.saturating_add(
upstream_source_lead_cap()
.as_micros()
.min(u64::MAX as u128) as u64,
);
if packet_pts_us > lead_ceiling_us {
packet_pts_us = lead_ceiling_us;
lead_clamped = true;
}
}
if let Some(last_packet_pts_us) = state.last_packet_pts_us
@ -132,6 +154,7 @@ impl SourcePtsRebaser {
capture_base_us: state.capture_base_us,
used_source_pts,
lag_clamped,
lead_clamped,
}
}
}
@ -159,6 +182,21 @@ impl DurationPacedSourcePtsRebaser {
packet_pts_us = lag_floor_us;
rebased.lag_clamped = true;
}
let lead_ceiling_us = rebased.capture_now_us.saturating_add(
upstream_source_lead_cap()
.as_micros()
.min(u64::MAX as u128) as u64,
);
if packet_pts_us > lead_ceiling_us {
packet_pts_us = lead_ceiling_us;
rebased.lead_clamped = true;
}
if let Some(last_packet_pts_us) = state.last_packet_pts_us
&& packet_pts_us <= last_packet_pts_us
{
packet_pts_us = last_packet_pts_us.saturating_add(1);
}
state.last_packet_pts_us = Some(packet_pts_us);
state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us));
rebased.packet_pts_us = packet_pts_us;
rebased