lesavka/client/src/app/uplink_media.rs

619 lines
27 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

impl LesavkaClientApp {
/*──────────────── mic stream ─────────────────*/
#[cfg(not(coverage))]
async fn voice_loop(
ep: Channel,
initial_source: Option<String>,
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls,
) {
let mut delay = Duration::from_secs(1);
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let state = media_controls.refresh();
if !state.microphone {
telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let microphone_source_choice = state.microphone_source.clone();
let active_source = microphone_source_choice.resolve(initial_source.as_deref());
let use_default_source = matches!(
microphone_source_choice,
crate::live_media_control::MediaDeviceChoice::Auto
) && active_source.is_none();
let setup_source = active_source.clone();
let result = tokio::task::spawn_blocking(move || {
if use_default_source {
MicrophoneCapture::new_default_source()
} else {
MicrophoneCapture::new_with_source(setup_source.as_deref())
}
})
.await;
let mic = match result {
Ok(Ok(mic)) => Arc::new(mic),
Ok(Err(err)) => {
telemetry.record_disconnect(format!("microphone uplink setup failed: {err:#}"));
warn!(
"🎤 microphone uplink setup failed for {:?}: {err:#}",
active_source.as_deref().unwrap_or("auto")
);
abort_if_required_media_source_failed("microphone", "🎤", active_source.as_deref(), &err);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
Err(err) => {
telemetry.record_disconnect(format!("microphone uplink setup task failed: {err}"));
warn!("🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}");
abort_if_required_media_source_failed(
"microphone",
"🎤",
active_source.as_deref(),
&err,
);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE);
let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new(
"microphone",
"🎤",
)));
let queue_stream = queue.clone();
let telemetry_stream = telemetry.clone();
let drop_log_stream = Arc::clone(&drop_log);
let outbound = async_stream::stream! {
loop {
let next = queue_stream.pop_fresh().await;
if next.dropped_stale > 0 {
telemetry_stream.record_stale_drop(next.dropped_stale);
log_uplink_drop(
&drop_log_stream,
UplinkDropReason::Stale,
next.dropped_stale,
next.queue_depth,
duration_ms(next.delivery_age),
);
}
if let Some(mut packet) = next.packet {
telemetry_stream.record_streamed(
queue_depth_u32(next.queue_depth),
duration_ms(next.delivery_age),
);
attach_audio_timing_metadata(
&mut packet,
next.queue_depth,
next.delivery_age,
);
yield packet;
continue;
}
break;
}
};
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => {
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let mic_clone = mic.clone();
let telemetry_thread = telemetry.clone();
let queue_thread = queue.clone();
let drop_log_thread = Arc::clone(&drop_log);
let media_controls_thread = media_controls.clone();
let initial_source_thread = initial_source.clone();
let active_source_thread = active_source.clone();
let mic_worker = std::thread::spawn(move || {
let mut paused = false;
while stop_rx.try_recv().is_err() {
let state = media_controls_thread.refresh();
let desired_source = state
.microphone_source
.resolve(initial_source_thread.as_deref());
if desired_source != active_source_thread {
tracing::info!(
from = active_source_thread.as_deref().unwrap_or("auto"),
to = desired_source.as_deref().unwrap_or("auto"),
"🎤 microphone source changed; restarting live uplink pipeline"
);
break;
}
if !state.microphone {
if !paused {
telemetry_thread.record_enabled(false);
tracing::info!("🎤 microphone uplink soft-paused");
paused = true;
}
std::thread::sleep(Duration::from_millis(20));
continue;
}
if paused {
telemetry_thread.record_enabled(true);
tracing::info!("🎤 microphone uplink resumed");
paused = false;
}
if let Some(pkt) = mic_clone.pull() {
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let stats = queue_thread.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry_thread.record_queue_full_drop(stats.dropped_queue_full);
log_uplink_drop(
&drop_log_thread,
UplinkDropReason::QueueFull,
stats.dropped_queue_full,
stats.queue_depth,
duration_ms(enqueue_age),
);
}
telemetry_thread.record_enqueue(
queue_depth_u32(stats.queue_depth),
duration_ms(enqueue_age),
0.0,
);
}
}
});
delay = Duration::from_secs(1);
telemetry.record_connected();
while resp.get_mut().message().await.transpose().is_some() {}
telemetry.record_disconnect("microphone uplink stream ended");
queue.close();
let _ = stop_tx.send(());
let _ = mic_worker.join();
}
Err(e) => {
telemetry.record_disconnect(format!("microphone uplink connect failed: {e}"));
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌🎤 connect failed: {e}");
warn!("⚠️🎤 further microphonestream failures will be logged at DEBUG");
} else {
debug!("❌🎤 reconnect failed: {e}");
}
delay = app_support::next_delay(delay);
}
}
queue.close();
tokio::time::sleep(delay).await;
}
}
/*──────────────── cam stream ───────────────────*/
#[cfg(not(coverage))]
async fn cam_loop(
ep: Channel,
initial_source: Option<String>,
initial_profile: Option<String>,
camera_cfg: Option<crate::input::camera::CameraConfig>,
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls,
) {
let mut delay = Duration::from_secs(1);
loop {
let state = media_controls.refresh();
if !state.camera {
telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let active_source = state.camera_source.resolve(initial_source.as_deref());
let active_profile = state.camera_profile.resolve(initial_profile.as_deref());
let capture_profile = active_profile
.as_deref()
.and_then(parse_camera_profile_id);
let setup_source = active_source.clone();
let result = tokio::task::spawn_blocking(move || {
CameraCapture::new_with_capture_profile(
setup_source.as_deref(),
camera_cfg,
capture_profile,
)
})
.await;
let cam = match result {
Ok(Ok(cam)) => Arc::new(cam),
Ok(Err(err)) => {
telemetry.record_disconnect(format!("webcam uplink setup failed: {err:#}"));
warn!(
"📸 webcam uplink setup failed for {:?}: {err:#}",
active_source.as_deref().unwrap_or("auto")
);
abort_if_required_media_source_failed("camera", "📸", active_source.as_deref(), &err);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
Err(err) => {
telemetry.record_disconnect(format!("webcam uplink setup task failed: {err}"));
warn!("📸 webcam uplink setup task failed before StreamCamera could start: {err}");
abort_if_required_media_source_failed(
"camera",
"📸",
active_source.as_deref(),
&err,
);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE);
let drop_log =
Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new("camera", "📸")));
let queue_stream = queue.clone();
let telemetry_stream = telemetry.clone();
let drop_log_stream = Arc::clone(&drop_log);
let outbound = async_stream::stream! {
loop {
let next = queue_stream.pop_fresh().await;
if next.dropped_stale > 0 {
telemetry_stream.record_stale_drop(next.dropped_stale);
log_uplink_drop(
&drop_log_stream,
UplinkDropReason::Stale,
next.dropped_stale,
next.queue_depth,
duration_ms(next.delivery_age),
);
}
if let Some(mut packet) = next.packet {
telemetry_stream.record_streamed(
queue_depth_u32(next.queue_depth),
duration_ms(next.delivery_age),
);
attach_video_timing_metadata(
&mut packet,
next.queue_depth,
next.delivery_age,
);
yield packet;
continue;
}
break;
}
};
match cli.stream_camera(Request::new(outbound)).await {
Ok(mut resp) => {
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let cam_worker = std::thread::spawn({
let cam = cam.clone();
let telemetry = telemetry.clone();
let queue = queue.clone();
let drop_log = Arc::clone(&drop_log);
let media_controls = media_controls.clone();
let initial_source_thread = initial_source.clone();
let active_source_thread = active_source.clone();
let initial_profile_thread = initial_profile.clone();
let active_profile_thread = active_profile.clone();
move || loop {
if stop_rx.try_recv().is_ok() {
break;
}
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_source_thread.as_deref());
let desired_profile =
state.camera_profile.resolve(initial_profile_thread.as_deref());
if desired_source != active_source_thread
|| desired_profile != active_profile_thread
{
tracing::info!(
from = active_source_thread.as_deref().unwrap_or("auto"),
to = desired_source.as_deref().unwrap_or("auto"),
"📸 webcam source changed; restarting live uplink pipeline"
);
break;
}
if !state.camera {
telemetry.record_enabled(false);
tracing::info!("📸 webcam uplink soft-paused");
while stop_rx.try_recv().is_err() {
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_source_thread.as_deref());
let desired_profile = state
.camera_profile
.resolve(initial_profile_thread.as_deref());
if desired_source != active_source_thread
|| desired_profile != active_profile_thread
{
break;
}
if state.camera {
break;
}
std::thread::sleep(Duration::from_millis(25));
}
if stop_rx.try_recv().is_ok() {
break;
}
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_source_thread.as_deref());
let desired_profile = state
.camera_profile
.resolve(initial_profile_thread.as_deref());
if desired_source != active_source_thread
|| desired_profile != active_profile_thread
{
tracing::info!(
from = active_source_thread.as_deref().unwrap_or("auto"),
to = desired_source.as_deref().unwrap_or("auto"),
"📸 webcam source changed while paused; restarting live uplink pipeline"
);
break;
}
telemetry.record_enabled(true);
tracing::info!("📸 webcam uplink resumed");
}
let Some(pkt) = cam.pull() else {
std::thread::sleep(Duration::from_millis(5));
continue;
};
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n.is_multiple_of(120) {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let stats = queue.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry.record_queue_full_drop(stats.dropped_queue_full);
log_uplink_drop(
&drop_log,
UplinkDropReason::QueueFull,
stats.dropped_queue_full,
stats.queue_depth,
duration_ms(enqueue_age),
);
}
telemetry.record_enqueue(
queue_depth_u32(stats.queue_depth),
duration_ms(enqueue_age),
0.0,
);
}
});
delay = Duration::from_secs(1);
telemetry.record_connected();
while resp.get_mut().message().await.transpose().is_some() {}
telemetry.record_disconnect("camera uplink stream ended");
queue.close();
let _ = stop_tx.send(());
let _ = cam_worker.join();
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
tracing::warn!("📸 server does not support StreamCamera giving up");
telemetry.record_disconnect("camera uplink unavailable on server");
queue.close();
return;
}
Err(e) => {
telemetry.record_disconnect(format!("camera uplink connect failed: {e}"));
tracing::warn!("❌📸 connect failed: {e:?}");
delay = app_support::next_delay(delay);
}
}
queue.close();
tokio::time::sleep(delay).await;
}
}
}
#[cfg(not(coverage))]
fn initial_camera_profile_id_from_env() -> Option<String> {
let width = std::env::var("LESAVKA_CAM_WIDTH").ok()?;
let height = std::env::var("LESAVKA_CAM_HEIGHT").ok()?;
let fps = std::env::var("LESAVKA_CAM_FPS").ok()?;
Some(format!("{width}x{height}@{fps}"))
}
#[cfg(not(coverage))]
fn parse_camera_profile_id(raw: &str) -> Option<(u32, u32, u32)> {
let (size, fps) = raw.split_once('@')?;
let (width, height) = size.split_once('x')?;
let width = width.parse().ok()?;
let height = height.parse().ok()?;
let fps = fps.parse().ok()?;
(width > 0 && height > 0 && fps > 0).then_some((width, height, fps))
}
#[cfg(not(coverage))]
fn abort_if_required_media_source_failed(
kind: &str,
icon: &str,
source: Option<&str>,
err: &dyn std::fmt::Display,
) {
if !explicit_media_sources_required() || source.is_none_or(|source| source.trim().is_empty()) {
return;
}
let source = source.expect("checked source presence");
error!(
"{icon} required {kind} source '{source}' failed to start; aborting client because LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES=1: {err}"
);
eprintln!(
"{icon} required {kind} source '{source}' failed to start; aborting client because LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES=1: {err}"
);
std::process::exit(2);
}
#[cfg(not(coverage))]
fn explicit_media_sources_required() -> bool {
std::env::var("LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES")
.ok()
.is_some_and(|value| {
let value = value.trim();
value == "1"
|| value.eq_ignore_ascii_case("true")
|| value.eq_ignore_ascii_case("yes")
|| value.eq_ignore_ascii_case("on")
})
}
#[cfg(not(coverage))]
const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 32,
max_age: Duration::from_millis(350),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly,
};
#[cfg(not(coverage))]
const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 64,
max_age: Duration::from_millis(400),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest,
};
#[cfg(not(coverage))]
fn queue_depth_u32(depth: usize) -> u32 {
depth.try_into().unwrap_or(u32::MAX)
}
#[cfg(not(coverage))]
fn duration_ms(duration: Duration) -> f32 {
duration.as_secs_f32() * 1_000.0
}
#[cfg(not(coverage))]
fn duration_ms_u32(duration: Duration) -> u32 {
duration.as_millis().min(u128::from(u32::MAX)) as u32
}
#[cfg(not(coverage))]
fn attach_audio_timing_metadata(
packet: &mut AudioPacket,
queue_depth: usize,
delivery_age: Duration,
) {
static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = packet.pts;
packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us();
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
#[cfg(not(coverage))]
fn attach_video_timing_metadata(
packet: &mut VideoPacket,
queue_depth: usize,
delivery_age: Duration,
) {
static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = packet.pts;
packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us();
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)]
enum UplinkDropReason {
QueueFull,
Stale,
}
#[cfg(not(coverage))]
#[derive(Debug)]
struct UplinkDropLogLimiter {
stream: &'static str,
icon: &'static str,
last_warn_at: Option<Instant>,
suppressed_full: u64,
suppressed_stale: u64,
}
#[cfg(not(coverage))]
/// Aggregate freshness-first upstream drops into periodic warnings per stream.
impl UplinkDropLogLimiter {
fn new(stream: &'static str, icon: &'static str) -> Self {
Self {
stream,
icon,
last_warn_at: None,
suppressed_full: 0,
suppressed_stale: 0,
}
}
/// Fold full-queue and stale-packet drops into one periodic warning.
fn record(&mut self, reason: UplinkDropReason, count: u64, queue_depth: usize, age_ms: f32) {
match reason {
UplinkDropReason::QueueFull => {
self.suppressed_full = self.suppressed_full.saturating_add(count)
}
UplinkDropReason::Stale => {
self.suppressed_stale = self.suppressed_stale.saturating_add(count)
}
}
let should_warn = self
.last_warn_at
.map(|last| last.elapsed() >= UPLINK_DROP_WARN_INTERVAL)
.unwrap_or(true);
if should_warn {
warn!(
stream = self.stream,
dropped_queue_full = self.suppressed_full,
dropped_stale = self.suppressed_stale,
queue_depth,
latest_age_ms = age_ms,
"{} upstream {} queue is dropping stale/superseded packets to preserve live A/V sync",
self.icon,
self.stream
);
self.suppressed_full = 0;
self.suppressed_stale = 0;
self.last_warn_at = Some(Instant::now());
} else {
debug!(
stream = self.stream,
?reason,
count,
queue_depth,
latest_age_ms = age_ms,
"upstream media queue drop suppressed from WARN noise"
);
}
}
}
#[cfg(not(coverage))]
const UPLINK_DROP_WARN_INTERVAL: Duration = Duration::from_secs(5);
#[cfg(not(coverage))]
/// Report an upstream queue drop through the shared rate limiter.
fn log_uplink_drop(
limiter: &Arc<std::sync::Mutex<UplinkDropLogLimiter>>,
reason: UplinkDropReason,
count: u64,
queue_depth: usize,
age_ms: f32,
) {
if let Ok(mut limiter) = limiter.lock() {
limiter.record(reason, count, queue_depth, age_ms);
}
}