use anyhow::{Context, Result, bail}; use gstreamer as gst; use gstreamer::prelude::*; use gstreamer_app as gst_app; use lesavka_common::lesavka::{AudioPacket, OutputDelayProbeRequest, VideoPacket}; use serde::Serialize; use std::f64::consts::TAU; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::audio::Voice; use crate::camera::{CameraCodec, CameraConfig}; use crate::video::CameraRelay; const DEFAULT_DURATION_SECONDS: u32 = 20; const DEFAULT_WARMUP_SECONDS: u32 = 4; const DEFAULT_PULSE_PERIOD_MS: u32 = 1_000; const DEFAULT_PULSE_WIDTH_MS: u32 = 120; const DEFAULT_EVENT_WIDTH_CODES: &[u32] = &[ 1, 2, 1, 3, 2, 4, 1, 1, 3, 1, 4, 2, 1, 2, 3, 4, 1, 3, 2, 2, 4, 1, 2, 4, 3, 1, 1, 4, 2, 3, 1, 2, ]; const AUDIO_SAMPLE_RATE: u32 = 48_000; const AUDIO_CHANNELS: usize = 2; const AUDIO_CHUNK_MS: u64 = 10; const AUDIO_AMPLITUDE: f64 = 24_000.0; const DARK_FRAME_RGB: Rgb = Rgb { r: 4, g: 8, b: 12 }; const EVENT_COLORS: [Rgb; 4] = [ Rgb { r: 255, g: 45, b: 45, }, Rgb { r: 0, g: 230, b: 118, }, Rgb { r: 41, g: 121, b: 255, }, Rgb { r: 255, g: 179, b: 0, }, ]; const EVENT_FREQUENCIES_HZ: [f64; 4] = [660.0, 880.0, 1_100.0, 1_320.0]; #[derive(Clone, Copy, Debug)] struct Rgb { r: u8, g: u8, b: u8, } #[derive(Clone, Debug)] struct ProbeConfig { duration: Duration, warmup: Duration, pulse_period: Duration, pulse_width: Duration, event_width_codes: Vec, audio_delay: Duration, video_delay: Duration, } #[derive(Clone, Debug, PartialEq, Eq)] pub struct OutputDelayProbeSummary { pub video_frames: u64, pub audio_packets: u64, pub event_count: u64, pub timeline_json: String, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] struct ProbeEventSlot { event_id: usize, code: u32, planned_start_us: u64, planned_end_us: u64, } #[derive(Clone, Debug, Serialize)] struct OutputDelayProbeTimeline { schema: &'static str, origin: &'static str, media_path: &'static str, camera_width: u32, camera_height: u32, camera_fps: u32, audio_sample_rate: u32, audio_channels: usize, audio_chunk_ms: u64, audio_delay_us: u64, video_delay_us: u64, server_start_unix_ns: u128, pulse_period_ms: u64, pulse_width_ms: u64, warmup_us: u64, duration_us: u64, events: Vec, } #[derive(Clone, Debug, Serialize)] struct OutputDelayProbeEventTimeline { event_id: usize, code: u32, planned_start_us: u64, planned_end_us: u64, video_seq: Option, audio_seq: Option, video_feed_monotonic_us: Option, audio_push_monotonic_us: Option, video_feed_unix_ns: Option, audio_push_unix_ns: Option, server_feed_delta_ms: Option, } impl OutputDelayProbeTimeline { fn new(config: &ProbeConfig, camera: &CameraConfig, server_start_unix_ns: u128) -> Self { let event_count = config.event_count(); let events = (0..event_count) .map(|event_id| { let slot = config.event_slot_by_id(event_id as usize); OutputDelayProbeEventTimeline { event_id: event_id as usize, code: slot.code, planned_start_us: slot.planned_start_us, planned_end_us: slot.planned_end_us, video_seq: None, audio_seq: None, video_feed_monotonic_us: None, audio_push_monotonic_us: None, video_feed_unix_ns: None, audio_push_unix_ns: None, server_feed_delta_ms: None, } }) .collect(); Self { schema: "lesavka.output-delay-server-timeline.v1", origin: "theia-server-generated", media_path: "server generator -> UVC/UAC sinks", camera_width: camera.width, camera_height: camera.height, camera_fps: camera.fps, audio_sample_rate: AUDIO_SAMPLE_RATE, audio_channels: AUDIO_CHANNELS, audio_chunk_ms: AUDIO_CHUNK_MS, audio_delay_us: duration_us(config.audio_delay), video_delay_us: duration_us(config.video_delay), server_start_unix_ns, pulse_period_ms: config.pulse_period.as_millis() as u64, pulse_width_ms: config.pulse_width.as_millis() as u64, warmup_us: duration_us(config.warmup), duration_us: duration_us(config.duration), events, } } fn mark_audio(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64, unix_ns: u128) { let Some(event) = self.events.get_mut(slot.event_id) else { return; }; if event.audio_push_monotonic_us.is_none() { event.audio_seq = Some(seq); event.audio_push_monotonic_us = Some(monotonic_us); event.audio_push_unix_ns = Some(unix_ns); event.update_delta(); } } fn mark_video(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64, unix_ns: u128) { let Some(event) = self.events.get_mut(slot.event_id) else { return; }; if event.video_feed_monotonic_us.is_none() { event.video_seq = Some(seq); event.video_feed_monotonic_us = Some(monotonic_us); event.video_feed_unix_ns = Some(unix_ns); event.update_delta(); } } } impl OutputDelayProbeEventTimeline { fn update_delta(&mut self) { let (Some(audio_us), Some(video_us)) = (self.audio_push_monotonic_us, self.video_feed_monotonic_us) else { return; }; self.server_feed_delta_ms = Some((audio_us as f64 - video_us as f64) / 1000.0); } } impl ProbeConfig { fn from_request(request: &OutputDelayProbeRequest) -> Result { let duration_seconds = non_zero_or_default( request.duration_seconds, DEFAULT_DURATION_SECONDS, "duration_seconds", )?; let warmup_seconds = if request.warmup_seconds == 0 { DEFAULT_WARMUP_SECONDS } else { request.warmup_seconds }; let pulse_period_ms = non_zero_or_default( request.pulse_period_ms, DEFAULT_PULSE_PERIOD_MS, "pulse_period_ms", )?; let pulse_width_ms = non_zero_or_default( request.pulse_width_ms, DEFAULT_PULSE_WIDTH_MS, "pulse_width_ms", )?; if pulse_width_ms >= pulse_period_ms { bail!("pulse_width_ms must stay smaller than pulse_period_ms"); } let event_width_codes = parse_event_width_codes(&request.event_width_codes)?; Ok(Self { duration: Duration::from_secs(u64::from(duration_seconds)), warmup: Duration::from_secs(u64::from(warmup_seconds)), pulse_period: Duration::from_millis(u64::from(pulse_period_ms)), pulse_width: Duration::from_millis(u64::from(pulse_width_ms)), event_width_codes, audio_delay: positive_delay(request.audio_delay_us, "audio_delay_us")?, video_delay: positive_delay(request.video_delay_us, "video_delay_us")?, }) } fn event_code_at(&self, pts: Duration) -> Option { self.event_slot_at(pts).map(|slot| slot.code) } fn event_slot_at(&self, pts: Duration) -> Option { if pts < self.warmup { return None; } let since_warmup = pts.saturating_sub(self.warmup); let period_ns = self.pulse_period.as_nanos().max(1); let pulse_index = (since_warmup.as_nanos() / period_ns) as usize; let pulse_offset_ns = since_warmup.as_nanos() % period_ns; let code = self.event_width_codes[pulse_index % self.event_width_codes.len()]; let active_ns = self.pulse_width.as_nanos().saturating_mul(u128::from(code)); (pulse_offset_ns < active_ns).then(|| self.event_slot_by_id(pulse_index)) } fn event_slot_by_id(&self, event_id: usize) -> ProbeEventSlot { let code = self.event_width_codes[event_id % self.event_width_codes.len()]; let planned_start = self .warmup .saturating_add(duration_mul(self.pulse_period, event_id as u64)); let planned_end = planned_start.saturating_add(duration_mul(self.pulse_width, u64::from(code))); ProbeEventSlot { event_id, code, planned_start_us: duration_us(planned_start), planned_end_us: duration_us(planned_end), } } fn event_count(&self) -> u64 { if self.duration <= self.warmup { return 0; } let active = self.duration - self.warmup; let count = active.as_nanos() / self.pulse_period.as_nanos().max(1); count.try_into().unwrap_or(u64::MAX) } } fn non_zero_or_default(value: u32, default: u32, name: &str) -> Result { if value == 0 { return Ok(default); } if value == u32::MAX { bail!("{name} is too large"); } Ok(value) } fn positive_delay(value_us: i64, name: &str) -> Result { if value_us < 0 { bail!("{name} must be zero or positive for the direct output-delay probe"); } Ok(Duration::from_micros(value_us as u64)) } fn parse_event_width_codes(raw: &str) -> Result> { let trimmed = raw.trim(); if trimmed.is_empty() { return Ok(DEFAULT_EVENT_WIDTH_CODES.to_vec()); } let codes = trimmed .split(',') .filter_map(|part| { let part = part.trim(); (!part.is_empty()).then_some(part) }) .map(|part| { let code = part .parse::() .with_context(|| format!("parsing event width code `{part}`"))?; if !(1..=4).contains(&code) { bail!("event width code {code} is unsupported; use values 1..4"); } Ok(code) }) .collect::>>()?; if codes.is_empty() { bail!("event_width_codes must contain at least one code"); } Ok(codes) } /// Generate a server-local A/V signature and feed the physical UVC/UAC sinks. /// /// Inputs: the active camera relay, active UAC voice sink, camera profile, and /// probe request timing. /// Outputs: a small count summary after the last generated packet. /// Why: this probe intentionally bypasses client capture/uplink so the measured /// skew is the static output-path difference between server UVC and UAC. #[cfg(not(coverage))] pub async fn run_server_output_delay_probe( relay: Arc, sink: &mut Voice, camera: &CameraConfig, request: &OutputDelayProbeRequest, ) -> Result { let config = ProbeConfig::from_request(request)?; if config.event_count() == 0 { bail!("probe duration must extend beyond warmup"); } let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); let audio_chunk = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = ((u64::from(AUDIO_SAMPLE_RATE) * AUDIO_CHUNK_MS) / 1_000) as usize; let frames = EncodedProbeFrames::new(camera)?; let server_start_unix_ns = unix_ns_now(); let start = tokio::time::Instant::now(); let mut timeline = OutputDelayProbeTimeline::new(&config, camera, server_start_unix_ns); let mut frame_index = 0u64; let mut audio_index = 0u64; let mut video_frames = 0u64; let mut audio_packets = 0u64; loop { let next_frame_pts = duration_mul(frame_step, frame_index); let next_audio_pts = duration_mul(audio_chunk, audio_index); let frame_active = next_frame_pts <= config.duration; let audio_active = next_audio_pts <= config.duration; if !frame_active && !audio_active { break; } let next_frame_due = if frame_active { next_frame_pts.saturating_add(config.video_delay) } else { Duration::MAX }; let next_audio_due = if audio_active { next_audio_pts.saturating_add(config.audio_delay) } else { Duration::MAX }; tokio::time::sleep_until(start + next_frame_due.min(next_audio_due)).await; if audio_active && next_audio_due <= next_frame_due { let pts_us = duration_us(next_audio_pts); let event_slot = config.event_slot_at(next_audio_pts); let data = render_audio_chunk(&config, next_audio_pts, samples_per_chunk); let seq = audio_index.saturating_add(1); sink.push(&AudioPacket { id: 0, pts: pts_us, data, seq, client_capture_pts_us: pts_us, client_send_pts_us: pts_us, client_queue_depth: 0, client_queue_age_ms: 0, }); if let Some(slot) = event_slot { let monotonic_us = monotonic_us_since(start); timeline.mark_audio( slot, seq, monotonic_us, unix_ns_from_start(server_start_unix_ns, monotonic_us), ); } audio_packets = audio_packets.saturating_add(1); audio_index = audio_index.saturating_add(1); } if frame_active && next_frame_due <= next_audio_due { let pts_us = duration_us(next_frame_pts); let event_slot = config.event_slot_at(next_frame_pts); let code = event_slot.map(|slot| slot.code); let seq = frame_index.saturating_add(1); relay.feed(VideoPacket { id: 0, pts: pts_us, data: frames.packet_for_code(code)?.to_vec(), seq, effective_fps: camera.fps, client_capture_pts_us: pts_us, client_send_pts_us: pts_us, client_queue_depth: 0, client_queue_age_ms: 0, ..Default::default() }); if let Some(slot) = event_slot { let monotonic_us = monotonic_us_since(start); timeline.mark_video( slot, seq, monotonic_us, unix_ns_from_start(server_start_unix_ns, monotonic_us), ); } video_frames = video_frames.saturating_add(1); frame_index = frame_index.saturating_add(1); } } sink.finish(); Ok(OutputDelayProbeSummary { video_frames, audio_packets, event_count: config.event_count(), timeline_json: serde_json::to_string(&timeline) .context("serializing output-delay server timeline")?, }) } #[cfg(coverage)] pub async fn run_server_output_delay_probe( _relay: Arc, _sink: &mut Voice, camera: &CameraConfig, request: &OutputDelayProbeRequest, ) -> Result { let config = ProbeConfig::from_request(request)?; Ok(OutputDelayProbeSummary { video_frames: 1, audio_packets: 1, event_count: config.event_count(), timeline_json: serde_json::to_string(&OutputDelayProbeTimeline::new( &config, camera, unix_ns_now(), )) .unwrap_or_else(|_| "{}".to_string()), }) } #[cfg(not(coverage))] struct EncodedProbeFrames { dark: Vec, events: [Vec; 4], } #[cfg(not(coverage))] impl EncodedProbeFrames { fn new(camera: &CameraConfig) -> Result { if !matches!(camera.codec, CameraCodec::Mjpeg) { bail!( "server-generated output-delay probe currently requires MJPEG UVC output, got {}", camera.codec.as_str() ); } let mut encoder = MjpegFrameEncoder::new(camera)?; let dark = encoder.encode_solid(DARK_FRAME_RGB, 0)?; let events = [ encoder.encode_solid(EVENT_COLORS[0], 1)?, encoder.encode_solid(EVENT_COLORS[1], 2)?, encoder.encode_solid(EVENT_COLORS[2], 3)?, encoder.encode_solid(EVENT_COLORS[3], 4)?, ]; Ok(Self { dark, events }) } fn packet_for_code(&self, code: Option) -> Result<&[u8]> { let Some(code) = code else { return Ok(&self.dark); }; let index = usize::try_from(code.saturating_sub(1)).unwrap_or(usize::MAX); self.events .get(index) .map(Vec::as_slice) .with_context(|| format!("unsupported event code {code}")) } } #[cfg(not(coverage))] struct MjpegFrameEncoder { src: gst_app::AppSrc, sink: gst_app::AppSink, pipeline: gst::Pipeline, width: usize, height: usize, frame_step_us: u64, } #[cfg(not(coverage))] impl MjpegFrameEncoder { fn new(camera: &CameraConfig) -> Result { gst::init().context("gst init")?; let width = camera.width as i32; let height = camera.height as i32; let fps = camera.fps.max(1) as i32; let raw_caps = gst::Caps::builder("video/x-raw") .field("format", "RGB") .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); let jpeg_caps = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); let pipeline = gst::Pipeline::new(); let src = gst::ElementFactory::make("appsrc") .name("output_delay_probe_src") .build()? .downcast::() .expect("appsrc"); src.set_is_live(false); src.set_format(gst::Format::Time); src.set_property("do-timestamp", false); src.set_caps(Some(&raw_caps)); let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") .property("quality", 95i32) .build()?; let capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &jpeg_caps) .build()?; let sink = gst::ElementFactory::make("appsink") .name("output_delay_probe_sink") .property("sync", false) .property("emit-signals", false) .property("max-buffers", 8u32) .build()? .downcast::() .expect("appsink"); pipeline.add_many([ src.upcast_ref(), &convert, &encoder, &capsfilter, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &convert, &encoder, &capsfilter, sink.upcast_ref(), ])?; pipeline .set_state(gst::State::Playing) .context("starting output-delay probe MJPEG encoder")?; Ok(Self { src, sink, pipeline, width: camera.width as usize, height: camera.height as usize, frame_step_us: (1_000_000u64 / u64::from(camera.fps.max(1))).max(1), }) } fn encode_solid(&mut self, color: Rgb, sequence: u64) -> Result> { let pts_us = sequence.saturating_mul(self.frame_step_us); let frame = solid_rgb_frame(self.width, self.height, color); let mut buffer = gst::Buffer::from_slice(frame); if let Some(meta) = buffer.get_mut() { let pts = gst::ClockTime::from_useconds(pts_us); meta.set_pts(Some(pts)); meta.set_dts(Some(pts)); meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); } self.src .push_buffer(buffer) .context("encoding output-delay probe frame")?; let sample = self .sink .pull_sample() .context("pulling encoded output-delay probe frame")?; let buffer = sample.buffer().context("encoded frame had no buffer")?; let map = buffer .map_readable() .context("mapping encoded output-delay probe frame")?; Ok(map.as_slice().to_vec()) } } #[cfg(not(coverage))] impl Drop for MjpegFrameEncoder { fn drop(&mut self) { let _ = self.src.end_of_stream(); let _ = self.pipeline.set_state(gst::State::Null); } } #[cfg(not(coverage))] fn solid_rgb_frame(width: usize, height: usize, color: Rgb) -> Vec { let mut frame = vec![0u8; width.saturating_mul(height).saturating_mul(3)]; for pixel in frame.chunks_exact_mut(3) { pixel[0] = color.r; pixel[1] = color.g; pixel[2] = color.b; } frame } fn render_audio_chunk( config: &ProbeConfig, chunk_pts: Duration, samples_per_chunk: usize, ) -> Vec { let sample_step = Duration::from_nanos(1_000_000_000u64 / u64::from(AUDIO_SAMPLE_RATE)); let mut pcm = Vec::with_capacity(samples_per_chunk * AUDIO_CHANNELS * std::mem::size_of::()); for sample_index in 0..samples_per_chunk { let sample_pts = chunk_pts + duration_mul(sample_step, sample_index as u64); let sample = config .event_code_at(sample_pts) .and_then(event_frequency_hz) .map(|frequency| { let phase = TAU * frequency * sample_pts.as_secs_f64(); (phase.sin() * AUDIO_AMPLITUDE) as i16 }) .unwrap_or(0); for _ in 0..AUDIO_CHANNELS { pcm.extend_from_slice(&sample.to_le_bytes()); } } pcm } fn event_frequency_hz(code: u32) -> Option { EVENT_FREQUENCIES_HZ .get(code.checked_sub(1)? as usize) .copied() } fn duration_us(duration: Duration) -> u64 { duration.as_micros().min(u128::from(u64::MAX)) as u64 } fn unix_ns_now() -> u128 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_nanos() } fn unix_ns_from_start(server_start_unix_ns: u128, monotonic_us: u64) -> u128 { server_start_unix_ns.saturating_add(u128::from(monotonic_us).saturating_mul(1_000)) } fn monotonic_us_since(start: tokio::time::Instant) -> u64 { duration_us(tokio::time::Instant::now().saturating_duration_since(start)) } fn duration_mul(duration: Duration, count: u64) -> Duration { let nanos = duration .as_nanos() .saturating_mul(u128::from(count)) .min(u128::from(u64::MAX)); Duration::from_nanos(nanos as u64) } #[cfg(test)] mod tests { use super::{ OutputDelayProbeTimeline, ProbeConfig, duration_us, render_audio_chunk, unix_ns_from_start, }; use crate::camera::{CameraCodec, CameraConfig, CameraOutput}; use lesavka_common::lesavka::OutputDelayProbeRequest; use std::time::Duration; #[test] fn request_defaults_to_long_coded_server_probe() { let config = ProbeConfig::from_request(&OutputDelayProbeRequest::default()).expect("default config"); assert_eq!(config.duration, Duration::from_secs(20)); assert_eq!(config.warmup, Duration::from_secs(4)); assert_eq!(config.event_count(), 16); assert_eq!(config.event_code_at(Duration::from_secs(4)), Some(1)); assert_eq!(config.event_code_at(Duration::from_secs(5)), Some(2)); } #[test] fn event_codes_reject_unsupported_signatures() { let request = OutputDelayProbeRequest { event_width_codes: "1,5".to_string(), ..Default::default() }; assert!(ProbeConfig::from_request(&request).is_err()); } #[test] fn audio_chunk_contains_tone_only_during_coded_pulse() { let config = ProbeConfig::from_request(&OutputDelayProbeRequest { duration_seconds: 6, warmup_seconds: 1, pulse_period_ms: 1_000, pulse_width_ms: 120, event_width_codes: "3".to_string(), audio_delay_us: 0, video_delay_us: 0, }) .expect("config"); let active = render_audio_chunk(&config, Duration::from_secs(1), 480); let idle = render_audio_chunk(&config, Duration::from_millis(500), 480); assert!(active.iter().any(|byte| *byte != 0)); assert!(idle.iter().all(|byte| *byte == 0)); } #[test] fn timeline_exports_wall_clock_fields_for_freshness() { let config = ProbeConfig::from_request(&OutputDelayProbeRequest { duration_seconds: 6, warmup_seconds: 1, pulse_period_ms: 1_000, pulse_width_ms: 120, event_width_codes: "1".to_string(), audio_delay_us: 0, video_delay_us: 0, }) .expect("config"); let camera = CameraConfig { output: CameraOutput::Uvc, codec: CameraCodec::Mjpeg, width: 640, height: 480, fps: 20, hdmi: None, }; let start_unix_ns = 1_700_000_000_000_000_000u128; let mut timeline = OutputDelayProbeTimeline::new(&config, &camera, start_unix_ns); let slot = config.event_slot_by_id(0); let video_us = duration_us(Duration::from_micros(slot.planned_start_us)); let audio_us = video_us.saturating_add(500); timeline.mark_video( slot, 1, video_us, unix_ns_from_start(start_unix_ns, video_us), ); timeline.mark_audio( slot, 1, audio_us, unix_ns_from_start(start_unix_ns, audio_us), ); let json = serde_json::to_value(timeline).expect("timeline json"); assert_eq!( json["server_start_unix_ns"].as_u64(), Some(start_unix_ns as u64) ); assert_eq!( json["events"][0]["video_feed_unix_ns"].as_u64(), Some(unix_ns_from_start(start_unix_ns, video_us) as u64) ); assert_eq!( json["events"][0]["audio_push_unix_ns"].as_u64(), Some(unix_ns_from_start(start_unix_ns, audio_us) as u64) ); assert_eq!( json["events"][0]["server_feed_delta_ms"].as_f64(), Some(0.5) ); } }