use anyhow::{Context, Result, bail}; use gstreamer as gst; use gstreamer::prelude::*; use gstreamer_app as gst_app; use lesavka_common::lesavka::{AudioPacket, OutputDelayProbeRequest, VideoPacket}; use std::f64::consts::TAU; use std::sync::Arc; use std::time::Duration; use crate::audio::Voice; use crate::camera::{CameraCodec, CameraConfig}; use crate::video::CameraRelay; const DEFAULT_DURATION_SECONDS: u32 = 20; const DEFAULT_WARMUP_SECONDS: u32 = 4; const DEFAULT_PULSE_PERIOD_MS: u32 = 1_000; const DEFAULT_PULSE_WIDTH_MS: u32 = 120; const DEFAULT_EVENT_WIDTH_CODES: &[u32] = &[ 1, 2, 1, 3, 2, 4, 1, 1, 3, 1, 4, 2, 1, 2, 3, 4, 1, 3, 2, 2, 4, 1, 2, 4, 3, 1, 1, 4, 2, 3, 1, 2, ]; const AUDIO_SAMPLE_RATE: u32 = 48_000; const AUDIO_CHANNELS: usize = 2; const AUDIO_CHUNK_MS: u64 = 10; const AUDIO_AMPLITUDE: f64 = 24_000.0; const DARK_FRAME_RGB: Rgb = Rgb { r: 4, g: 8, b: 12 }; const EVENT_COLORS: [Rgb; 4] = [ Rgb { r: 255, g: 45, b: 45, }, Rgb { r: 0, g: 230, b: 118, }, Rgb { r: 41, g: 121, b: 255, }, Rgb { r: 255, g: 179, b: 0, }, ]; const EVENT_FREQUENCIES_HZ: [f64; 4] = [660.0, 880.0, 1_100.0, 1_320.0]; #[derive(Clone, Copy, Debug)] struct Rgb { r: u8, g: u8, b: u8, } #[derive(Clone, Debug)] struct ProbeConfig { duration: Duration, warmup: Duration, pulse_period: Duration, pulse_width: Duration, event_width_codes: Vec, } #[derive(Clone, Debug, PartialEq, Eq)] pub struct OutputDelayProbeSummary { pub video_frames: u64, pub audio_packets: u64, pub event_count: u64, } impl ProbeConfig { fn from_request(request: &OutputDelayProbeRequest) -> Result { let duration_seconds = non_zero_or_default( request.duration_seconds, DEFAULT_DURATION_SECONDS, "duration_seconds", )?; let warmup_seconds = if request.warmup_seconds == 0 { DEFAULT_WARMUP_SECONDS } else { request.warmup_seconds }; let pulse_period_ms = non_zero_or_default( request.pulse_period_ms, DEFAULT_PULSE_PERIOD_MS, "pulse_period_ms", )?; let pulse_width_ms = non_zero_or_default( request.pulse_width_ms, DEFAULT_PULSE_WIDTH_MS, "pulse_width_ms", )?; if pulse_width_ms >= pulse_period_ms { bail!("pulse_width_ms must stay smaller than pulse_period_ms"); } let event_width_codes = parse_event_width_codes(&request.event_width_codes)?; Ok(Self { duration: Duration::from_secs(u64::from(duration_seconds)), warmup: Duration::from_secs(u64::from(warmup_seconds)), pulse_period: Duration::from_millis(u64::from(pulse_period_ms)), pulse_width: Duration::from_millis(u64::from(pulse_width_ms)), event_width_codes, }) } fn event_code_at(&self, pts: Duration) -> Option { if pts < self.warmup { return None; } let since_warmup = pts.saturating_sub(self.warmup); let period_ns = self.pulse_period.as_nanos().max(1); let pulse_index = (since_warmup.as_nanos() / period_ns) as usize; let pulse_offset_ns = since_warmup.as_nanos() % period_ns; let code = self.event_width_codes[pulse_index % self.event_width_codes.len()]; let active_ns = self.pulse_width.as_nanos().saturating_mul(u128::from(code)); (pulse_offset_ns < active_ns).then_some(code) } fn event_count(&self) -> u64 { if self.duration <= self.warmup { return 0; } let active = self.duration - self.warmup; let count = active.as_nanos() / self.pulse_period.as_nanos().max(1); count.try_into().unwrap_or(u64::MAX) } } fn non_zero_or_default(value: u32, default: u32, name: &str) -> Result { if value == 0 { return Ok(default); } if value == u32::MAX { bail!("{name} is too large"); } Ok(value) } fn parse_event_width_codes(raw: &str) -> Result> { let trimmed = raw.trim(); if trimmed.is_empty() { return Ok(DEFAULT_EVENT_WIDTH_CODES.to_vec()); } let codes = trimmed .split(',') .filter_map(|part| { let part = part.trim(); (!part.is_empty()).then_some(part) }) .map(|part| { let code = part .parse::() .with_context(|| format!("parsing event width code `{part}`"))?; if !(1..=4).contains(&code) { bail!("event width code {code} is unsupported; use values 1..4"); } Ok(code) }) .collect::>>()?; if codes.is_empty() { bail!("event_width_codes must contain at least one code"); } Ok(codes) } /// Generate a server-local A/V signature and feed the physical UVC/UAC sinks. /// /// Inputs: the active camera relay, active UAC voice sink, camera profile, and /// probe request timing. /// Outputs: a small count summary after the last generated packet. /// Why: this probe intentionally bypasses client capture/uplink so the measured /// skew is the static output-path difference between server UVC and UAC. #[cfg(not(coverage))] pub async fn run_server_output_delay_probe( relay: Arc, sink: &mut Voice, camera: &CameraConfig, request: &OutputDelayProbeRequest, ) -> Result { let config = ProbeConfig::from_request(request)?; if config.event_count() == 0 { bail!("probe duration must extend beyond warmup"); } let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); let audio_chunk = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = ((u64::from(AUDIO_SAMPLE_RATE) * AUDIO_CHUNK_MS) / 1_000) as usize; let frames = EncodedProbeFrames::new(camera)?; let start = tokio::time::Instant::now(); let mut frame_index = 0u64; let mut audio_index = 0u64; let mut video_frames = 0u64; let mut audio_packets = 0u64; loop { let next_frame_pts = duration_mul(frame_step, frame_index); let next_audio_pts = duration_mul(audio_chunk, audio_index); let next_pts = next_frame_pts.min(next_audio_pts); if next_pts > config.duration { break; } tokio::time::sleep_until(start + next_pts).await; if next_audio_pts <= next_frame_pts && next_audio_pts <= config.duration { let pts_us = duration_us(next_audio_pts); let data = render_audio_chunk(&config, next_audio_pts, samples_per_chunk); sink.push(&AudioPacket { id: 0, pts: pts_us, data, seq: audio_index.saturating_add(1), client_capture_pts_us: pts_us, client_send_pts_us: pts_us, client_queue_depth: 0, client_queue_age_ms: 0, }); audio_packets = audio_packets.saturating_add(1); audio_index = audio_index.saturating_add(1); } if next_frame_pts <= next_audio_pts && next_frame_pts <= config.duration { let pts_us = duration_us(next_frame_pts); let code = config.event_code_at(next_frame_pts); relay.feed(VideoPacket { id: 0, pts: pts_us, data: frames.packet_for_code(code)?.to_vec(), seq: frame_index.saturating_add(1), effective_fps: camera.fps, client_capture_pts_us: pts_us, client_send_pts_us: pts_us, client_queue_depth: 0, client_queue_age_ms: 0, ..Default::default() }); video_frames = video_frames.saturating_add(1); frame_index = frame_index.saturating_add(1); } } sink.finish(); Ok(OutputDelayProbeSummary { video_frames, audio_packets, event_count: config.event_count(), }) } #[cfg(coverage)] pub async fn run_server_output_delay_probe( _relay: Arc, _sink: &mut Voice, _camera: &CameraConfig, request: &OutputDelayProbeRequest, ) -> Result { let config = ProbeConfig::from_request(request)?; Ok(OutputDelayProbeSummary { video_frames: 1, audio_packets: 1, event_count: config.event_count(), }) } #[cfg(not(coverage))] struct EncodedProbeFrames { dark: Vec, events: [Vec; 4], } #[cfg(not(coverage))] impl EncodedProbeFrames { fn new(camera: &CameraConfig) -> Result { if !matches!(camera.codec, CameraCodec::Mjpeg) { bail!( "server-generated output-delay probe currently requires MJPEG UVC output, got {}", camera.codec.as_str() ); } let mut encoder = MjpegFrameEncoder::new(camera)?; let dark = encoder.encode_solid(DARK_FRAME_RGB, 0)?; let events = [ encoder.encode_solid(EVENT_COLORS[0], 1)?, encoder.encode_solid(EVENT_COLORS[1], 2)?, encoder.encode_solid(EVENT_COLORS[2], 3)?, encoder.encode_solid(EVENT_COLORS[3], 4)?, ]; Ok(Self { dark, events }) } fn packet_for_code(&self, code: Option) -> Result<&[u8]> { let Some(code) = code else { return Ok(&self.dark); }; let index = usize::try_from(code.saturating_sub(1)).unwrap_or(usize::MAX); self.events .get(index) .map(Vec::as_slice) .with_context(|| format!("unsupported event code {code}")) } } #[cfg(not(coverage))] struct MjpegFrameEncoder { src: gst_app::AppSrc, sink: gst_app::AppSink, pipeline: gst::Pipeline, width: usize, height: usize, frame_step_us: u64, } #[cfg(not(coverage))] impl MjpegFrameEncoder { fn new(camera: &CameraConfig) -> Result { gst::init().context("gst init")?; let width = camera.width as i32; let height = camera.height as i32; let fps = camera.fps.max(1) as i32; let raw_caps = gst::Caps::builder("video/x-raw") .field("format", "RGB") .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); let jpeg_caps = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); let pipeline = gst::Pipeline::new(); let src = gst::ElementFactory::make("appsrc") .name("output_delay_probe_src") .build()? .downcast::() .expect("appsrc"); src.set_is_live(false); src.set_format(gst::Format::Time); src.set_property("do-timestamp", false); src.set_caps(Some(&raw_caps)); let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") .property("quality", 95i32) .build()?; let capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &jpeg_caps) .build()?; let sink = gst::ElementFactory::make("appsink") .name("output_delay_probe_sink") .property("sync", false) .property("emit-signals", false) .property("max-buffers", 8u32) .build()? .downcast::() .expect("appsink"); pipeline.add_many([ src.upcast_ref(), &convert, &encoder, &capsfilter, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &convert, &encoder, &capsfilter, sink.upcast_ref(), ])?; pipeline .set_state(gst::State::Playing) .context("starting output-delay probe MJPEG encoder")?; Ok(Self { src, sink, pipeline, width: camera.width as usize, height: camera.height as usize, frame_step_us: (1_000_000u64 / u64::from(camera.fps.max(1))).max(1), }) } fn encode_solid(&mut self, color: Rgb, sequence: u64) -> Result> { let pts_us = sequence.saturating_mul(self.frame_step_us); let frame = solid_rgb_frame(self.width, self.height, color); let mut buffer = gst::Buffer::from_slice(frame); if let Some(meta) = buffer.get_mut() { let pts = gst::ClockTime::from_useconds(pts_us); meta.set_pts(Some(pts)); meta.set_dts(Some(pts)); meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); } self.src .push_buffer(buffer) .context("encoding output-delay probe frame")?; let sample = self .sink .pull_sample() .context("pulling encoded output-delay probe frame")?; let buffer = sample.buffer().context("encoded frame had no buffer")?; let map = buffer .map_readable() .context("mapping encoded output-delay probe frame")?; Ok(map.as_slice().to_vec()) } } #[cfg(not(coverage))] impl Drop for MjpegFrameEncoder { fn drop(&mut self) { let _ = self.src.end_of_stream(); let _ = self.pipeline.set_state(gst::State::Null); } } #[cfg(not(coverage))] fn solid_rgb_frame(width: usize, height: usize, color: Rgb) -> Vec { let mut frame = vec![0u8; width.saturating_mul(height).saturating_mul(3)]; for pixel in frame.chunks_exact_mut(3) { pixel[0] = color.r; pixel[1] = color.g; pixel[2] = color.b; } frame } fn render_audio_chunk( config: &ProbeConfig, chunk_pts: Duration, samples_per_chunk: usize, ) -> Vec { let sample_step = Duration::from_nanos(1_000_000_000u64 / u64::from(AUDIO_SAMPLE_RATE)); let mut pcm = Vec::with_capacity(samples_per_chunk * AUDIO_CHANNELS * std::mem::size_of::()); for sample_index in 0..samples_per_chunk { let sample_pts = chunk_pts + duration_mul(sample_step, sample_index as u64); let sample = config .event_code_at(sample_pts) .and_then(event_frequency_hz) .map(|frequency| { let phase = TAU * frequency * sample_pts.as_secs_f64(); (phase.sin() * AUDIO_AMPLITUDE) as i16 }) .unwrap_or(0); for _ in 0..AUDIO_CHANNELS { pcm.extend_from_slice(&sample.to_le_bytes()); } } pcm } fn event_frequency_hz(code: u32) -> Option { EVENT_FREQUENCIES_HZ .get(code.checked_sub(1)? as usize) .copied() } fn duration_us(duration: Duration) -> u64 { duration.as_micros().min(u128::from(u64::MAX)) as u64 } fn duration_mul(duration: Duration, count: u64) -> Duration { let nanos = duration .as_nanos() .saturating_mul(u128::from(count)) .min(u128::from(u64::MAX)); Duration::from_nanos(nanos as u64) } #[cfg(test)] mod tests { use super::{ProbeConfig, render_audio_chunk}; use lesavka_common::lesavka::OutputDelayProbeRequest; use std::time::Duration; #[test] fn request_defaults_to_long_coded_server_probe() { let config = ProbeConfig::from_request(&OutputDelayProbeRequest::default()).expect("default config"); assert_eq!(config.duration, Duration::from_secs(20)); assert_eq!(config.warmup, Duration::from_secs(4)); assert_eq!(config.event_count(), 16); assert_eq!(config.event_code_at(Duration::from_secs(4)), Some(1)); assert_eq!(config.event_code_at(Duration::from_secs(5)), Some(2)); } #[test] fn event_codes_reject_unsupported_signatures() { let request = OutputDelayProbeRequest { event_width_codes: "1,5".to_string(), ..Default::default() }; assert!(ProbeConfig::from_request(&request).is_err()); } #[test] fn audio_chunk_contains_tone_only_during_coded_pulse() { let config = ProbeConfig::from_request(&OutputDelayProbeRequest { duration_seconds: 6, warmup_seconds: 1, pulse_period_ms: 1_000, pulse_width_ms: 120, event_width_codes: "3".to_string(), }) .expect("config"); let active = render_audio_chunk(&config, Duration::from_secs(1), 480); let idle = render_audio_chunk(&config, Duration::from_millis(500), 480); assert!(active.iter().any(|byte| *byte != 0)); assert!(idle.iter().all(|byte| *byte == 0)); } }