diff --git a/AGENTS.md b/AGENTS.md index 4e395f5..410ad12 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -134,7 +134,8 @@ Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind ### Phase 1: Build The Probe - [x] Create this tracked checklist in `AGENTS.md`. - [x] Inventory existing `client/src/sync_probe/` code and decide what can be reused. - - Reuse the existing synthetic beacon in `client/src/sync_probe/`. + - Reuse the existing analyzer/coded-pulse model, but keep direct UVC/UAC + output-delay media server-generated. - Reuse the existing Tethys capture harness in `scripts/manual/run_upstream_av_sync.sh`. - Reuse and extend `lesavka-sync-analyze`; current gap is structured evidence output, not capture generation. - [x] Define the phase-1 output contract: @@ -142,7 +143,7 @@ Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind - [x] `report.txt` - [x] per-event rows with event id, video time, audio time, skew, and confidence - [x] pass/fail verdict using preferred/acceptable/catastrophic thresholds -- [x] Add a deterministic local sync beacon source: +- [x] Add a deterministic server-output sync beacon source: - [x] video flash pattern with event identity or cadence - [x] simultaneous audio click/beep - [x] stable event schedule suitable for automated detection @@ -154,7 +155,7 @@ Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind - [x] detect audio clicks - [x] pair events and compute skew - [x] Add a runner that can launch or instruct the Tethys probe safely over SSH without rebooting or restarting the desktop. -- [x] Store probe artifacts under `/tmp/lesavka-sync-probe-*` by default. +- [x] Store direct UVC/UAC probe artifacts under `/tmp/lesavka-output-delay-probe-*` by default. - [x] Keep the probe usable without Google Meet first; Google Meet validation is a later application-level check. ### Phase 2: Use Probe To Root-Cause Desync @@ -162,7 +163,7 @@ Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind - First live run reached the devices but exposed analyzer/tooling gaps instead of a valid skew report. - Fixed the manual probe tunnel to preserve HTTPS/mTLS through SSH (`LESAVKA_SERVER_SCHEME=https`, `LESAVKA_TLS_DOMAIN=lesavka-server`). - Fixed analyzer handling for MJPEG captures whose FFprobe metadata over-reports frames versus decodable video frames. -- [x] Compare client-generated event times against Tethys-observed times. +- [x] Compare server-generated paired output signatures against Tethys-observed device capture times. - The preserved Tethys capture had 323 decodable frames with constant brightness, so no video flash reached UVC. - Server logs show the probe entered a stale upstream session and dropped audio as ~326 seconds late. - [x] Identify whether delay appears before server planning, at server UAC sink, at UVC helper, inside Tethys device capture, or inside browser/WebRTC. diff --git a/Cargo.lock b/Cargo.lock index d9c5a39..e659661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.19.1" +version = "0.19.2" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.19.1" +version = "0.19.2" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.19.1" +version = "0.19.2" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 4395256..c13c2e2 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.19.1" +version = "0.19.2" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-relayctl.rs b/client/src/bin/lesavka-relayctl.rs index cf64676..591687e 100644 --- a/client/src/bin/lesavka-relayctl.rs +++ b/client/src/bin/lesavka-relayctl.rs @@ -1,7 +1,7 @@ use anyhow::{Context, Result, bail}; use lesavka_common::lesavka::{ CalibrationAction, CalibrationRequest, CalibrationState, CapturePowerCommand, HandshakeSet, - SetCapturePowerRequest, relay_client::RelayClient, + OutputDelayProbeRequest, SetCapturePowerRequest, relay_client::RelayClient, }; #[cfg(not(coverage))] use lesavka_common::lesavka::{Empty, handshake_client::HandshakeClient}; @@ -28,6 +28,7 @@ enum CommandKind { RecoverUvc, ResetUsb, UpstreamSync, + OutputDelayProbe, } impl CommandKind { @@ -53,6 +54,7 @@ impl CommandKind { "recover-uvc" => Some(Self::RecoverUvc), "reset-usb" | "hard-reset-usb" => Some(Self::ResetUsb), "upstream-sync" | "sync" => Some(Self::UpstreamSync), + "output-delay-probe" | "probe-output-delay" => Some(Self::OutputDelayProbe), _ => None, } } @@ -65,6 +67,23 @@ struct Config { audio_delta_us: i64, video_delta_us: i64, note: String, + probe_duration_seconds: u32, + probe_warmup_seconds: u32, + probe_pulse_period_ms: u32, + probe_pulse_width_ms: u32, + probe_event_width_codes: String, +} + +#[derive(Debug, Default, Eq, PartialEq)] +struct ParsedCommandArgs { + audio_delta_us: i64, + video_delta_us: i64, + note: String, + probe_duration_seconds: u32, + probe_warmup_seconds: u32, + probe_pulse_period_ms: u32, + probe_pulse_width_ms: u32, + probe_event_width_codes: String, } #[derive(Debug, Eq, PartialEq)] @@ -74,7 +93,7 @@ enum ParseOutcome { } fn usage() -> &'static str { - "Usage: lesavka-relayctl [--server http://HOST:50051] [note]|calibration-save-default|calibration-restore-default|calibration-restore-factory|auto|on|off|recover-usb|recover-uac|recover-uvc|reset-usb>" + "Usage: lesavka-relayctl [--server http://HOST:50051] [note]|calibration-save-default|calibration-restore-default|calibration-restore-factory|auto|on|off|recover-usb|recover-uac|recover-uvc|reset-usb>" } fn parse_args_outcome_from(args: I) -> Result @@ -110,22 +129,31 @@ where } let command = command.unwrap_or(CommandKind::Status); - let (audio_delta_us, video_delta_us, note) = parse_command_args(command, command_args)?; + let parsed = parse_command_args(command, command_args)?; Ok(ParseOutcome::Run(Config { server, command, - audio_delta_us, - video_delta_us, - note, + audio_delta_us: parsed.audio_delta_us, + video_delta_us: parsed.video_delta_us, + note: parsed.note, + probe_duration_seconds: parsed.probe_duration_seconds, + probe_warmup_seconds: parsed.probe_warmup_seconds, + probe_pulse_period_ms: parsed.probe_pulse_period_ms, + probe_pulse_width_ms: parsed.probe_pulse_width_ms, + probe_event_width_codes: parsed.probe_event_width_codes, })) } -fn parse_command_args(command: CommandKind, args: Vec) -> Result<(i64, i64, String)> { +fn parse_command_args(command: CommandKind, args: Vec) -> Result { + if command == CommandKind::OutputDelayProbe { + return parse_output_delay_probe_args(args); + } + if command != CommandKind::CalibrationAdjust { if let Some(arg) = args.first() { bail!("unexpected argument `{arg}`\n{}", usage()); } - return Ok((0, 0, String::new())); + return Ok(ParsedCommandArgs::default()); } if args.len() < 2 { @@ -142,7 +170,39 @@ fn parse_command_args(command: CommandKind, args: Vec) -> Result<(i64, i .parse::() .with_context(|| format!("parsing video_delta_us `{}`", args[1]))?; let note = args[2..].join(" "); - Ok((audio_delta_us, video_delta_us, note)) + Ok(ParsedCommandArgs { + audio_delta_us, + video_delta_us, + note, + ..ParsedCommandArgs::default() + }) +} + +fn parse_output_delay_probe_args(args: Vec) -> Result { + if args.len() > 5 { + bail!( + "output-delay-probe accepts at most five arguments\n{}", + usage() + ); + } + let parse_u32 = |index: usize, name: &str| -> Result { + args.get(index) + .map(|value| { + value + .parse::() + .with_context(|| format!("parsing {name} `{value}`")) + }) + .transpose() + .map(|value| value.unwrap_or(0)) + }; + Ok(ParsedCommandArgs { + probe_duration_seconds: parse_u32(0, "duration_s")?, + probe_warmup_seconds: parse_u32(1, "warmup_s")?, + probe_pulse_period_ms: parse_u32(2, "period_ms")?, + probe_pulse_width_ms: parse_u32(3, "width_ms")?, + probe_event_width_codes: args.get(4).cloned().unwrap_or_default(), + ..ParsedCommandArgs::default() + }) } #[cfg(test)] @@ -174,7 +234,8 @@ fn capture_power_request(command: CommandKind) -> Option | CommandKind::RecoverUac | CommandKind::RecoverUvc | CommandKind::ResetUsb - | CommandKind::UpstreamSync => return None, + | CommandKind::UpstreamSync + | CommandKind::OutputDelayProbe => return None, CommandKind::Auto => (false, CapturePowerCommand::Auto), CommandKind::On => (true, CapturePowerCommand::ForceOn), CommandKind::Off => (false, CapturePowerCommand::ForceOff), @@ -461,7 +522,8 @@ fn calibration_request_for(config: &Config) -> Option { | CommandKind::RecoverUac | CommandKind::RecoverUvc | CommandKind::ResetUsb - | CommandKind::UpstreamSync => return None, + | CommandKind::UpstreamSync + | CommandKind::OutputDelayProbe => return None, }; Some(CalibrationRequest { action: action as i32, @@ -508,7 +570,8 @@ async fn main() -> Result<()> { | CommandKind::RecoverUac | CommandKind::RecoverUvc | CommandKind::ResetUsb - | CommandKind::UpstreamSync => unreachable!(), + | CommandKind::UpstreamSync + | CommandKind::OutputDelayProbe => unreachable!(), }; let reply = client .set_capture_power(Request::new(request)) @@ -529,6 +592,30 @@ async fn main() -> Result<()> { return Ok(()); } + if config.command == CommandKind::OutputDelayProbe { + let request = OutputDelayProbeRequest { + duration_seconds: config.probe_duration_seconds, + warmup_seconds: config.probe_warmup_seconds, + pulse_period_ms: config.probe_pulse_period_ms, + pulse_width_ms: config.probe_pulse_width_ms, + event_width_codes: config.probe_event_width_codes.clone(), + }; + let mut stream = client + .run_output_delay_probe(Request::new(request)) + .await + .context("running server-generated UVC/UAC output-delay probe")? + .into_inner(); + while let Some(reply) = stream + .message() + .await + .context("reading output-delay probe reply")? + { + println!("ok={}", reply.ok); + println!("detail={}", reply.detail); + } + return Ok(()); + } + let reply = match config.command { CommandKind::Status => client .get_capture_power(Request::new(Empty {})) @@ -592,6 +679,7 @@ async fn main() -> Result<()> { CommandKind::Version | CommandKind::Auto | CommandKind::On | CommandKind::Off => { unreachable!() } + CommandKind::OutputDelayProbe => unreachable!(), CommandKind::CalibrationAdjust | CommandKind::CalibrationRestoreDefault | CommandKind::CalibrationRestoreFactory @@ -647,6 +735,14 @@ mod tests { Some(CommandKind::UpstreamSync) ); assert_eq!(CommandKind::parse("sync"), Some(CommandKind::UpstreamSync)); + assert_eq!( + CommandKind::parse("output-delay-probe"), + Some(CommandKind::OutputDelayProbe) + ); + assert_eq!( + CommandKind::parse("probe-output-delay"), + Some(CommandKind::OutputDelayProbe) + ); assert_eq!(CommandKind::parse("force-on"), Some(CommandKind::On)); assert_eq!(CommandKind::parse("force-off"), Some(CommandKind::Off)); assert_eq!( @@ -686,6 +782,28 @@ mod tests { assert_eq!(config.command, CommandKind::UpstreamSync); } + #[test] + fn parse_args_accepts_output_delay_probe_config() { + let config = parse_args_from([ + "--server", + "http://lab:50051", + "output-delay-probe", + "20", + "4", + "1000", + "120", + "1,2,3,4", + ]) + .expect("probe config"); + + assert_eq!(config.command, CommandKind::OutputDelayProbe); + assert_eq!(config.probe_duration_seconds, 20); + assert_eq!(config.probe_warmup_seconds, 4); + assert_eq!(config.probe_pulse_period_ms, 1000); + assert_eq!(config.probe_pulse_width_ms, 120); + assert_eq!(config.probe_event_width_codes, "1,2,3,4"); + } + #[test] fn parse_args_accepts_calibration_adjustment() { let config = parse_args_from([ @@ -711,6 +829,7 @@ mod tests { assert!(parse_args_from(["status", "extra"]).is_err()); assert!(parse_args_from(["calibrate"]).is_err()); assert!(parse_args_from(["calibrate", "0", "not-int"]).is_err()); + assert!(parse_args_from(["output-delay-probe", "1", "2", "3", "4", "1", "extra"]).is_err()); } #[test] @@ -770,6 +889,7 @@ mod tests { assert!(capture_power_request(CommandKind::RecoverUvc).is_none()); assert!(capture_power_request(CommandKind::ResetUsb).is_none()); assert!(capture_power_request(CommandKind::UpstreamSync).is_none()); + assert!(capture_power_request(CommandKind::OutputDelayProbe).is_none()); } #[test] @@ -810,6 +930,11 @@ mod tests { audio_delta_us: 0, video_delta_us: 71_600, note: "probe".to_string(), + probe_duration_seconds: 0, + probe_warmup_seconds: 0, + probe_pulse_period_ms: 0, + probe_pulse_width_ms: 0, + probe_event_width_codes: String::new(), }; let request = calibration_request_for(&config).expect("request"); assert_eq!(request.action, CalibrationAction::AdjustActive as i32); diff --git a/client/src/launcher/tests/preview.rs b/client/src/launcher/tests/preview.rs index b184bc9..e2827f2 100644 --- a/client/src/launcher/tests/preview.rs +++ b/client/src/launcher/tests/preview.rs @@ -47,6 +47,13 @@ impl Relay for ProbeRelay { Pin> + Send>>; type StreamWebcamMediaStream = Pin> + Send>>; + type RunOutputDelayProbeStream = Pin< + Box< + dyn futures::Stream< + Item = Result, + > + Send, + >, + >; async fn stream_keyboard( &self, @@ -99,6 +106,13 @@ impl Relay for ProbeRelay { Ok(Response::new(Box::pin(stream::empty()))) } + async fn run_output_delay_probe( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(Box::pin(stream::empty()))) + } + async fn paste_text( &self, _request: Request, diff --git a/client/src/launcher/tests/utility_actions.rs b/client/src/launcher/tests/utility_actions.rs index 7be6420..7a8d5f6 100644 --- a/client/src/launcher/tests/utility_actions.rs +++ b/client/src/launcher/tests/utility_actions.rs @@ -5,8 +5,9 @@ use super::super::{ use futures::stream; use lesavka_common::lesavka::{ AudioPacket, CalibrationRequest, CalibrationState, CapturePowerState, Empty, KeyboardReport, - MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, - UpstreamMediaBundle, UpstreamSyncState, VideoPacket, + MonitorRequest, MouseReport, OutputDelayProbeReply, OutputDelayProbeRequest, PasteReply, + PasteRequest, ResetUsbReply, SetCapturePowerRequest, UpstreamMediaBundle, UpstreamSyncState, + VideoPacket, relay_server::{Relay, RelayServer}, }; use serial_test::serial; @@ -22,6 +23,8 @@ type MouseStream = Pin> + Send>>; type AudioStream = Pin> + Send>>; type EmptyStream = Pin> + Send>>; +type OutputDelayProbeStream = + Pin> + Send>>; #[derive(Clone)] struct UtilityRelay { @@ -49,6 +52,7 @@ impl Relay for UtilityRelay { type StreamMicrophoneStream = EmptyStream; type StreamCameraStream = EmptyStream; type StreamWebcamMediaStream = EmptyStream; + type RunOutputDelayProbeStream = OutputDelayProbeStream; async fn stream_keyboard( &self, @@ -99,6 +103,13 @@ impl Relay for UtilityRelay { Ok(Response::new(Box::pin(stream::empty()))) } + async fn run_output_delay_probe( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(Box::pin(stream::empty()))) + } + async fn paste_text( &self, _request: Request, diff --git a/common/Cargo.toml b/common/Cargo.toml index 56bb6d8..ca6dd6f 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.19.1" +version = "0.19.2" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 47367d9..f512f33 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -63,6 +63,21 @@ message UpstreamMediaBundle { uint32 video_fps = 11; } +message OutputDelayProbeRequest { + // The server generates the probe locally and feeds UVC/UAC directly. This + // measures the output-device path only, not client capture or uplink transit. + uint32 duration_seconds = 1; + uint32 warmup_seconds = 2; + uint32 pulse_period_ms = 3; + uint32 pulse_width_ms = 4; + string event_width_codes = 5; +} + +message OutputDelayProbeReply { + bool ok = 1; + string detail = 2; +} + message ResetUsbReply { bool ok = 1; } // true = success message PasteRequest { @@ -187,6 +202,7 @@ service Relay { rpc StreamMicrophone (stream AudioPacket) returns (stream Empty); rpc StreamCamera (stream VideoPacket) returns (stream Empty); rpc StreamWebcamMedia(stream UpstreamMediaBundle) returns (stream Empty); + rpc RunOutputDelayProbe(OutputDelayProbeRequest) returns (stream OutputDelayProbeReply); rpc PasteText (PasteRequest) returns (PasteReply); rpc RecoverUsb (Empty) returns (ResetUsbReply); diff --git a/docs/operational-env.md b/docs/operational-env.md index d4a83a6..64063ad 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -176,7 +176,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_MIC_TEST_SOURCE_DESC` | client media capture/playback override | | `LESAVKA_MOUSE_DEVICE` | input routing/clipboard override | | `LESAVKA_OUTPUT_DELAY_APPLY` | manual direct UVC/UAC probe override; apply the measured server output-delay correction through the calibration API when the probe gates pass | -| `LESAVKA_OUTPUT_DELAY_CALIBRATION` | manual direct UVC/UAC probe override; emit `output-delay-calibration.json` from a lab-attached USB host capture so the server can save static UVC/UAC output-path defaults, defaults to enabled | +| `LESAVKA_OUTPUT_DELAY_CALIBRATION` | manual direct UVC/UAC probe override; emit `output-delay-calibration.json` from a lab-attached USB host capture of server-generated signatures, defaults to enabled | | `LESAVKA_OUTPUT_DELAY_GAIN` | manual direct UVC/UAC probe override; scales measured output-delay correction before applying, defaults to `1.0` | | `LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS` | manual direct UVC/UAC probe safety limit; refuses to apply/save implausibly large measured device skew, defaults to `5000` | | `LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS` | manual direct UVC/UAC probe stability limit; refuses to apply/save unstable output-delay measurements, defaults to `80` | diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 149be34..084d7b2 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -2,8 +2,9 @@ # scripts/manual/run_upstream_av_sync.sh # Manual: upstream A/V sync hardware probe; not part of CI. # -# Manual: capture the real Tethys webcam/mic endpoints while the shared-clock -# sync probe streams upstream media through Lesavka, then analyze the skew. +# Manual: capture the real Tethys UVC/UAC endpoints while the Lesavka server +# generates paired probe media locally and feeds its own UVC/UAC output sinks. +# This intentionally measures only the server-to-host output-device skew. set -euo pipefail @@ -16,9 +17,12 @@ LESAVKA_SERVER_CONNECT_HOST=${LESAVKA_SERVER_CONNECT_HOST:-38.28.125.112} LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto} LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https} LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server} -PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-10} +PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-20} PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4} PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))} +PROBE_PULSE_PERIOD_MS=${PROBE_PULSE_PERIOD_MS:-1000} +PROBE_PULSE_WIDTH_MS=${PROBE_PULSE_WIDTH_MS:-120} +PROBE_EVENT_WIDTH_CODES=${PROBE_EVENT_WIDTH_CODES:-1,2,1,3,2,4,1,1,3,1,4,2,1,2,3,4,1,3,2,2,4,1,2,4,3,1,1,4,2,3,1,2} # Do not open the UVC host capture far ahead of the probe. The gadget side only # has frames once the sync probe is feeding the server, and some hosts time out # VIDIOC_STREAMON if the camera is starved during pre-roll. @@ -31,21 +35,19 @@ VIDEO_SIZE=${VIDEO_SIZE:-auto} VIDEO_FPS=${VIDEO_FPS:-auto} VIDEO_FORMAT=${VIDEO_FORMAT:-mjpeg} REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-pulse} -REMOTE_PULSE_CAPTURE_TOOL=${REMOTE_PULSE_CAPTURE_TOOL:-ffmpeg} +REMOTE_PULSE_CAPTURE_TOOL=${REMOTE_PULSE_CAPTURE_TOOL:-gst} REMOTE_PULSE_VIDEO_MODE=${REMOTE_PULSE_VIDEO_MODE:-copy} REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-0} ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280} SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=30"} -LOCAL_AUDIO_SANITY=${LOCAL_AUDIO_SANITY:-1} PROBE_PREBUILD=${PROBE_PREBUILD:-1} -PROBE_BIN=${PROBE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-probe"} ANALYZE_BIN=${ANALYZE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-analyze"} REMOTE_ANALYZE=${REMOTE_ANALYZE:-1} REMOTE_ANALYZE_BIN=${REMOTE_ANALYZE_BIN:-/tmp/lesavka-sync-analyze} REMOTE_ANALYZE_COPY=${REMOTE_ANALYZE_COPY:-1} -FETCH_CAPTURE=${FETCH_CAPTURE:-0} +FETCH_CAPTURE=${FETCH_CAPTURE:-1} REMOTE_SERVER_PREFLIGHT=${REMOTE_SERVER_PREFLIGHT:-1} REMOTE_EXPECT_CAM_OUTPUT=${REMOTE_EXPECT_CAM_OUTPUT:-uvc} REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg} @@ -61,8 +63,8 @@ LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000} CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__" STAMP="$(date +%Y%m%d-%H%M%S)" -REMOTE_CAPTURE=${REMOTE_CAPTURE:-"/tmp/lesavka-sync-probe-${STAMP}.mkv"} -LOCAL_REPORT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-sync-probe-${STAMP}" +REMOTE_CAPTURE=${REMOTE_CAPTURE:-"/tmp/lesavka-output-delay-probe-${STAMP}.mkv"} +LOCAL_REPORT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-output-delay-probe-${STAMP}" LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv" LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json" LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt" @@ -367,6 +369,8 @@ artifact = { "scope": "server-output-static-baseline", "applies_to": "server UVC/UAC gadget output path", "measurement_host_role": "lab-attached USB host", + "probe_media_origin": "server-generated", + "probe_media_path": "server generated signatures -> UVC/UAC sinks -> lab host capture", "report_json": report_path, "audio_after_video_positive": True, "target": target, @@ -457,23 +461,14 @@ maybe_apply_output_delay_calibration() { fi } -if [[ "${LOCAL_AUDIO_SANITY}" != "0" ]]; then - echo "==> verifying local speaker-to-mic sanity before upstream sync run" - "${SCRIPT_DIR}/run_local_audio_sanity.sh" -fi - if [[ "${PROBE_PREBUILD}" != "0" ]]; then - echo "==> prebuilding sync probe/analyzer before opening the capture window" + echo "==> prebuilding relay control/analyzer before opening the capture window" ( cd "${REPO_ROOT}" - cargo build -p lesavka_client --bin lesavka-sync-probe --bin lesavka-sync-analyze --bin lesavka-relayctl + cargo build -p lesavka_client --bin lesavka-sync-analyze --bin lesavka-relayctl ) fi -if [[ ! -x "${PROBE_BIN}" ]]; then - echo "sync probe binary not found at ${PROBE_BIN}" >&2 - exit 1 -fi if [[ ! -x "${ANALYZE_BIN}" ]]; then echo "sync analyzer binary not found at ${ANALYZE_BIN}" >&2 exit 1 @@ -1034,16 +1029,21 @@ wait_for_capture_ready sleep "${LEAD_IN_SECONDS}" -echo "==> running local Lesavka sync probe against ${RESOLVED_LESAVKA_SERVER_ADDR}" +echo "==> running server-generated UVC/UAC output-delay probe against ${RESOLVED_LESAVKA_SERVER_ADDR}" probe_status=0 probe_timed_out=0 ( cd "${REPO_ROOT}" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ - timeout --signal=INT "${PROBE_TIMEOUT_SECONDS}" "${PROBE_BIN}" \ - --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ - --duration-seconds "${PROBE_DURATION_SECONDS}" \ - --warmup-seconds "${PROBE_WARMUP_SECONDS}" + timeout --signal=INT "${PROBE_TIMEOUT_SECONDS}" \ + "${REPO_ROOT}/target/debug/lesavka-relayctl" \ + --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ + output-delay-probe \ + "${PROBE_DURATION_SECONDS}" \ + "${PROBE_WARMUP_SECONDS}" \ + "${PROBE_PULSE_PERIOD_MS}" \ + "${PROBE_PULSE_WIDTH_MS}" \ + "${PROBE_EVENT_WIDTH_CODES}" ) || probe_status=$? if [[ "${probe_status}" -eq 124 ]]; then probe_timed_out=1 @@ -1098,7 +1098,7 @@ REMOTE_NORMALIZE_SCRIPT fi echo "==> analyzing capture on ${TETHYS_HOST}" ssh ${SSH_OPTS} "${TETHYS_HOST}" \ - "chmod +x '${REMOTE_ANALYZE_BIN}' && '${REMOTE_ANALYZE_BIN}' '${remote_fetch_capture}' --json" \ + "chmod +x '${REMOTE_ANALYZE_BIN}' && '${REMOTE_ANALYZE_BIN}' '${remote_fetch_capture}' --json --event-width-codes '${PROBE_EVENT_WIDTH_CODES}'" \ > "${LOCAL_ANALYSIS_JSON}" fi @@ -1110,9 +1110,9 @@ fi if [[ "${probe_status}" -ne 0 ]]; then if [[ "${probe_timed_out}" -eq 1 ]]; then - echo "sync probe timed out after ${PROBE_TIMEOUT_SECONDS}s; this usually means one upstream stream did not close cleanly after capture starvation." >&2 + echo "server output-delay probe timed out after ${PROBE_TIMEOUT_SECONDS}s; this usually means one UVC/UAC output sink did not close cleanly." >&2 fi - echo "sync probe failed with status ${probe_status}" >&2 + echo "server output-delay probe failed with status ${probe_status}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 exit "${probe_status}" fi @@ -1193,7 +1193,7 @@ else echo "==> analyzing capture" ( cd "${REPO_ROOT}" - "${ANALYZE_BIN}" "${LOCAL_CAPTURE}" --report-dir "${LOCAL_REPORT_DIR}" + "${ANALYZE_BIN}" "${LOCAL_CAPTURE}" --report-dir "${LOCAL_REPORT_DIR}" --event-width-codes "${PROBE_EVENT_WIDTH_CODES}" ) fi diff --git a/server/Cargo.toml b/server/Cargo.toml index 5bc6056..0d14322 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.19.1" +version = "0.19.2" edition = "2024" autobins = false diff --git a/server/src/lib.rs b/server/src/lib.rs index ab7126c..fca4d7b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -13,6 +13,7 @@ pub mod capture_power; pub mod gadget; pub mod handshake; pub(crate) mod media_timing; +pub mod output_delay_probe; pub mod paste; pub mod runtime_support; pub mod security; diff --git a/server/src/main.rs b/server/src/main.rs index f144b99..f99d0b0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -19,8 +19,9 @@ use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::{ AudioPacket, CalibrationRequest, CalibrationState, CapturePowerCommand, CapturePowerState, - Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, - SetCapturePowerRequest, UpstreamMediaBundle, UpstreamSyncState, VideoPacket, + Empty, KeyboardReport, MonitorRequest, MouseReport, OutputDelayProbeReply, + OutputDelayProbeRequest, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, + UpstreamMediaBundle, UpstreamSyncState, VideoPacket, relay_server::{Relay, RelayServer}, }; diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index ccb99ea..1f92dd3 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -216,6 +216,7 @@ impl Relay for Handler { type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; type StreamWebcamMediaStream = ReceiverStream>; + type RunOutputDelayProbeStream = ReceiverStream>; async fn stream_keyboard( &self, @@ -564,6 +565,110 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + /// Generate deterministic media on the server and feed UVC/UAC directly. + async fn run_output_delay_probe( + &self, + req: Request, + ) -> Result, Status> { + let rpc_id = runtime_support::next_stream_id(); + let request = req.into_inner(); + let camera_cfg = camera::current_camera_config(); + let microphone_lease = self.upstream_media_rt.activate_microphone(); + let camera_lease = self.upstream_media_rt.activate_camera(); + info!( + rpc_id, + session_id = camera_lease.session_id, + camera_generation = camera_lease.generation, + microphone_generation = microphone_lease.generation, + output = camera_cfg.output.as_str(), + codec = camera_cfg.codec.as_str(), + width = camera_cfg.width, + height = camera_cfg.height, + fps = camera_cfg.fps, + "🧪 server output-delay probe opened" + ); + let (camera_session_id, relay, _relay_reused) = + match self.camera_rt.activate(&camera_cfg).await { + Ok(active) => active, + Err(err) => { + self.upstream_media_rt.close_camera(camera_lease.generation); + self.upstream_media_rt.close_microphone(microphone_lease.generation); + return Err(err); + } + }; + let Some(microphone_sink_permit) = self + .upstream_media_rt + .reserve_microphone_sink(microphone_lease.generation) + .await + else { + self.upstream_media_rt.close_camera(camera_lease.generation); + self.upstream_media_rt.close_microphone(microphone_lease.generation); + return Err(Status::aborted( + "output-delay probe superseded before microphone sink became available", + )); + }; + let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); + let mut sink = runtime_support::open_voice_with_retry(&uac_dev) + .await + .map_err(|e| { + self.upstream_media_rt.close_camera(camera_lease.generation); + self.upstream_media_rt.close_microphone(microphone_lease.generation); + Status::internal(format!("{e:#}")) + })?; + let camera_rt = self.camera_rt.clone(); + let upstream_media_rt = self.upstream_media_rt.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(async move { + let _microphone_sink_permit = microphone_sink_permit; + let result = if !camera_rt.is_active(camera_session_id) + || !upstream_media_rt.is_camera_active(camera_lease.generation) + || !upstream_media_rt.is_microphone_active(microphone_lease.generation) + { + Err(anyhow::anyhow!("output-delay probe superseded before start")) + } else { + lesavka_server::output_delay_probe::run_server_output_delay_probe( + relay, + &mut sink, + &camera_cfg, + &request, + ) + .await + }; + upstream_media_rt.close_camera(camera_lease.generation); + upstream_media_rt.close_microphone(microphone_lease.generation); + match result { + Ok(summary) => { + let detail = format!( + "server-generated UVC/UAC output-delay probe complete: video_frames={} audio_packets={} events={}", + summary.video_frames, summary.audio_packets, summary.event_count + ); + info!( + rpc_id, + session_id = camera_lease.session_id, + camera_session_id, + detail, + "🧪 server output-delay probe closed" + ); + tx.send(Ok(OutputDelayProbeReply { ok: true, detail })) + .await + .ok(); + } + Err(err) => { + warn!( + rpc_id, + session_id = camera_lease.session_id, + camera_session_id, + "🧪 server output-delay probe failed: {err:#}" + ); + tx.send(Err(Status::internal(format!("{err:#}")))).await.ok(); + } + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + /// Accept synthetic upstream microphone packets without ALSA hardware. async fn stream_microphone( &self, diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index a2212f2..b857955 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -46,6 +46,7 @@ impl Relay for Handler { type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; type StreamWebcamMediaStream = ReceiverStream>; + type RunOutputDelayProbeStream = ReceiverStream>; async fn stream_keyboard( &self, @@ -309,6 +310,34 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + async fn run_output_delay_probe( + &self, + req: Request, + ) -> Result, Status> { + let cfg = camera::current_camera_config(); + let (_session_id, relay, _relay_reused) = self.camera_rt.activate(&cfg).await?; + let mut sink = lesavka_server::audio::Voice::new("coverage-uac") + .await + .map_err(|e| Status::internal(format!("{e:#}")))?; + let summary = lesavka_server::output_delay_probe::run_server_output_delay_probe( + relay, + &mut sink, + &cfg, + &req.into_inner(), + ) + .await + .map_err(|e| Status::internal(format!("{e:#}")))?; + let (tx, rx) = tokio::sync::mpsc::channel(1); + let detail = format!( + "server-generated UVC/UAC output-delay probe complete: video_frames={} audio_packets={} events={}", + summary.video_frames, summary.audio_packets, summary.event_count + ); + tx.send(Ok(OutputDelayProbeReply { ok: true, detail })) + .await + .ok(); + Ok(Response::new(ReceiverStream::new(rx))) + } + async fn capture_video( &self, req: Request, diff --git a/server/src/output_delay_probe.rs b/server/src/output_delay_probe.rs new file mode 100644 index 0000000..250d288 --- /dev/null +++ b/server/src/output_delay_probe.rs @@ -0,0 +1,520 @@ +use anyhow::{Context, Result, bail}; +use gstreamer as gst; +use gstreamer::prelude::*; +use gstreamer_app as gst_app; +use lesavka_common::lesavka::{AudioPacket, OutputDelayProbeRequest, VideoPacket}; +use std::f64::consts::TAU; +use std::sync::Arc; +use std::time::Duration; + +use crate::audio::Voice; +use crate::camera::{CameraCodec, CameraConfig}; +use crate::video::CameraRelay; + +const DEFAULT_DURATION_SECONDS: u32 = 20; +const DEFAULT_WARMUP_SECONDS: u32 = 4; +const DEFAULT_PULSE_PERIOD_MS: u32 = 1_000; +const DEFAULT_PULSE_WIDTH_MS: u32 = 120; +const DEFAULT_EVENT_WIDTH_CODES: &[u32] = &[ + 1, 2, 1, 3, 2, 4, 1, 1, 3, 1, 4, 2, 1, 2, 3, 4, 1, 3, 2, 2, 4, 1, 2, 4, 3, 1, 1, 4, 2, 3, 1, 2, +]; +const AUDIO_SAMPLE_RATE: u32 = 48_000; +const AUDIO_CHANNELS: usize = 2; +const AUDIO_CHUNK_MS: u64 = 10; +const AUDIO_AMPLITUDE: f64 = 24_000.0; +const DARK_FRAME_RGB: Rgb = Rgb { r: 4, g: 8, b: 12 }; +const EVENT_COLORS: [Rgb; 4] = [ + Rgb { + r: 255, + g: 45, + b: 45, + }, + Rgb { + r: 0, + g: 230, + b: 118, + }, + Rgb { + r: 41, + g: 121, + b: 255, + }, + Rgb { + r: 255, + g: 179, + b: 0, + }, +]; +const EVENT_FREQUENCIES_HZ: [f64; 4] = [660.0, 880.0, 1_100.0, 1_320.0]; + +#[derive(Clone, Copy, Debug)] +struct Rgb { + r: u8, + g: u8, + b: u8, +} + +#[derive(Clone, Debug)] +struct ProbeConfig { + duration: Duration, + warmup: Duration, + pulse_period: Duration, + pulse_width: Duration, + event_width_codes: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct OutputDelayProbeSummary { + pub video_frames: u64, + pub audio_packets: u64, + pub event_count: u64, +} + +impl ProbeConfig { + fn from_request(request: &OutputDelayProbeRequest) -> Result { + let duration_seconds = non_zero_or_default( + request.duration_seconds, + DEFAULT_DURATION_SECONDS, + "duration_seconds", + )?; + let warmup_seconds = if request.warmup_seconds == 0 { + DEFAULT_WARMUP_SECONDS + } else { + request.warmup_seconds + }; + let pulse_period_ms = non_zero_or_default( + request.pulse_period_ms, + DEFAULT_PULSE_PERIOD_MS, + "pulse_period_ms", + )?; + let pulse_width_ms = non_zero_or_default( + request.pulse_width_ms, + DEFAULT_PULSE_WIDTH_MS, + "pulse_width_ms", + )?; + if pulse_width_ms >= pulse_period_ms { + bail!("pulse_width_ms must stay smaller than pulse_period_ms"); + } + + let event_width_codes = parse_event_width_codes(&request.event_width_codes)?; + Ok(Self { + duration: Duration::from_secs(u64::from(duration_seconds)), + warmup: Duration::from_secs(u64::from(warmup_seconds)), + pulse_period: Duration::from_millis(u64::from(pulse_period_ms)), + pulse_width: Duration::from_millis(u64::from(pulse_width_ms)), + event_width_codes, + }) + } + + fn event_code_at(&self, pts: Duration) -> Option { + if pts < self.warmup { + return None; + } + let since_warmup = pts.saturating_sub(self.warmup); + let period_ns = self.pulse_period.as_nanos().max(1); + let pulse_index = (since_warmup.as_nanos() / period_ns) as usize; + let pulse_offset_ns = since_warmup.as_nanos() % period_ns; + let code = self.event_width_codes[pulse_index % self.event_width_codes.len()]; + let active_ns = self.pulse_width.as_nanos().saturating_mul(u128::from(code)); + (pulse_offset_ns < active_ns).then_some(code) + } + + fn event_count(&self) -> u64 { + if self.duration <= self.warmup { + return 0; + } + let active = self.duration - self.warmup; + let count = active.as_nanos() / self.pulse_period.as_nanos().max(1); + count.try_into().unwrap_or(u64::MAX) + } +} + +fn non_zero_or_default(value: u32, default: u32, name: &str) -> Result { + if value == 0 { + return Ok(default); + } + if value == u32::MAX { + bail!("{name} is too large"); + } + Ok(value) +} + +fn parse_event_width_codes(raw: &str) -> Result> { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Ok(DEFAULT_EVENT_WIDTH_CODES.to_vec()); + } + let codes = trimmed + .split(',') + .filter_map(|part| { + let part = part.trim(); + (!part.is_empty()).then_some(part) + }) + .map(|part| { + let code = part + .parse::() + .with_context(|| format!("parsing event width code `{part}`"))?; + if !(1..=4).contains(&code) { + bail!("event width code {code} is unsupported; use values 1..4"); + } + Ok(code) + }) + .collect::>>()?; + if codes.is_empty() { + bail!("event_width_codes must contain at least one code"); + } + Ok(codes) +} + +/// Generate a server-local A/V signature and feed the physical UVC/UAC sinks. +/// +/// Inputs: the active camera relay, active UAC voice sink, camera profile, and +/// probe request timing. +/// Outputs: a small count summary after the last generated packet. +/// Why: this probe intentionally bypasses client capture/uplink so the measured +/// skew is the static output-path difference between server UVC and UAC. +#[cfg(not(coverage))] +pub async fn run_server_output_delay_probe( + relay: Arc, + sink: &mut Voice, + camera: &CameraConfig, + request: &OutputDelayProbeRequest, +) -> Result { + let config = ProbeConfig::from_request(request)?; + if config.event_count() == 0 { + bail!("probe duration must extend beyond warmup"); + } + + let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); + let audio_chunk = Duration::from_millis(AUDIO_CHUNK_MS); + let samples_per_chunk = ((u64::from(AUDIO_SAMPLE_RATE) * AUDIO_CHUNK_MS) / 1_000) as usize; + let frames = EncodedProbeFrames::new(camera)?; + let start = tokio::time::Instant::now(); + let mut frame_index = 0u64; + let mut audio_index = 0u64; + let mut video_frames = 0u64; + let mut audio_packets = 0u64; + + loop { + let next_frame_pts = duration_mul(frame_step, frame_index); + let next_audio_pts = duration_mul(audio_chunk, audio_index); + let next_pts = next_frame_pts.min(next_audio_pts); + if next_pts > config.duration { + break; + } + tokio::time::sleep_until(start + next_pts).await; + + if next_audio_pts <= next_frame_pts && next_audio_pts <= config.duration { + let pts_us = duration_us(next_audio_pts); + let data = render_audio_chunk(&config, next_audio_pts, samples_per_chunk); + sink.push(&AudioPacket { + id: 0, + pts: pts_us, + data, + seq: audio_index.saturating_add(1), + client_capture_pts_us: pts_us, + client_send_pts_us: pts_us, + client_queue_depth: 0, + client_queue_age_ms: 0, + }); + audio_packets = audio_packets.saturating_add(1); + audio_index = audio_index.saturating_add(1); + } + + if next_frame_pts <= next_audio_pts && next_frame_pts <= config.duration { + let pts_us = duration_us(next_frame_pts); + let code = config.event_code_at(next_frame_pts); + relay.feed(VideoPacket { + id: 0, + pts: pts_us, + data: frames.packet_for_code(code)?.to_vec(), + seq: frame_index.saturating_add(1), + effective_fps: camera.fps, + client_capture_pts_us: pts_us, + client_send_pts_us: pts_us, + client_queue_depth: 0, + client_queue_age_ms: 0, + ..Default::default() + }); + video_frames = video_frames.saturating_add(1); + frame_index = frame_index.saturating_add(1); + } + } + + sink.finish(); + Ok(OutputDelayProbeSummary { + video_frames, + audio_packets, + event_count: config.event_count(), + }) +} + +#[cfg(coverage)] +pub async fn run_server_output_delay_probe( + _relay: Arc, + _sink: &mut Voice, + _camera: &CameraConfig, + request: &OutputDelayProbeRequest, +) -> Result { + let config = ProbeConfig::from_request(request)?; + Ok(OutputDelayProbeSummary { + video_frames: 1, + audio_packets: 1, + event_count: config.event_count(), + }) +} + +#[cfg(not(coverage))] +struct EncodedProbeFrames { + dark: Vec, + events: [Vec; 4], +} + +#[cfg(not(coverage))] +impl EncodedProbeFrames { + fn new(camera: &CameraConfig) -> Result { + if !matches!(camera.codec, CameraCodec::Mjpeg) { + bail!( + "server-generated output-delay probe currently requires MJPEG UVC output, got {}", + camera.codec.as_str() + ); + } + + let mut encoder = MjpegFrameEncoder::new(camera)?; + let dark = encoder.encode_solid(DARK_FRAME_RGB, 0)?; + let events = [ + encoder.encode_solid(EVENT_COLORS[0], 1)?, + encoder.encode_solid(EVENT_COLORS[1], 2)?, + encoder.encode_solid(EVENT_COLORS[2], 3)?, + encoder.encode_solid(EVENT_COLORS[3], 4)?, + ]; + Ok(Self { dark, events }) + } + + fn packet_for_code(&self, code: Option) -> Result<&[u8]> { + let Some(code) = code else { + return Ok(&self.dark); + }; + let index = usize::try_from(code.saturating_sub(1)).unwrap_or(usize::MAX); + self.events + .get(index) + .map(Vec::as_slice) + .with_context(|| format!("unsupported event code {code}")) + } +} + +#[cfg(not(coverage))] +struct MjpegFrameEncoder { + src: gst_app::AppSrc, + sink: gst_app::AppSink, + pipeline: gst::Pipeline, + width: usize, + height: usize, + frame_step_us: u64, +} + +#[cfg(not(coverage))] +impl MjpegFrameEncoder { + fn new(camera: &CameraConfig) -> Result { + gst::init().context("gst init")?; + let width = camera.width as i32; + let height = camera.height as i32; + let fps = camera.fps.max(1) as i32; + let raw_caps = gst::Caps::builder("video/x-raw") + .field("format", "RGB") + .field("width", width) + .field("height", height) + .field("framerate", gst::Fraction::new(fps, 1)) + .build(); + let jpeg_caps = gst::Caps::builder("image/jpeg") + .field("parsed", true) + .field("width", width) + .field("height", height) + .field("framerate", gst::Fraction::new(fps, 1)) + .build(); + let pipeline = gst::Pipeline::new(); + let src = gst::ElementFactory::make("appsrc") + .name("output_delay_probe_src") + .build()? + .downcast::() + .expect("appsrc"); + src.set_is_live(false); + src.set_format(gst::Format::Time); + src.set_property("do-timestamp", false); + src.set_caps(Some(&raw_caps)); + let convert = gst::ElementFactory::make("videoconvert").build()?; + let encoder = gst::ElementFactory::make("jpegenc") + .property("quality", 95i32) + .build()?; + let capsfilter = gst::ElementFactory::make("capsfilter") + .property("caps", &jpeg_caps) + .build()?; + let sink = gst::ElementFactory::make("appsink") + .name("output_delay_probe_sink") + .property("sync", false) + .property("emit-signals", false) + .property("max-buffers", 8u32) + .build()? + .downcast::() + .expect("appsink"); + pipeline.add_many([ + src.upcast_ref(), + &convert, + &encoder, + &capsfilter, + sink.upcast_ref(), + ])?; + gst::Element::link_many([ + src.upcast_ref(), + &convert, + &encoder, + &capsfilter, + sink.upcast_ref(), + ])?; + pipeline + .set_state(gst::State::Playing) + .context("starting output-delay probe MJPEG encoder")?; + + Ok(Self { + src, + sink, + pipeline, + width: camera.width as usize, + height: camera.height as usize, + frame_step_us: (1_000_000u64 / u64::from(camera.fps.max(1))).max(1), + }) + } + + fn encode_solid(&mut self, color: Rgb, sequence: u64) -> Result> { + let pts_us = sequence.saturating_mul(self.frame_step_us); + let frame = solid_rgb_frame(self.width, self.height, color); + let mut buffer = gst::Buffer::from_slice(frame); + if let Some(meta) = buffer.get_mut() { + let pts = gst::ClockTime::from_useconds(pts_us); + meta.set_pts(Some(pts)); + meta.set_dts(Some(pts)); + meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); + } + self.src + .push_buffer(buffer) + .context("encoding output-delay probe frame")?; + let sample = self + .sink + .pull_sample() + .context("pulling encoded output-delay probe frame")?; + let buffer = sample.buffer().context("encoded frame had no buffer")?; + let map = buffer + .map_readable() + .context("mapping encoded output-delay probe frame")?; + Ok(map.as_slice().to_vec()) + } +} + +#[cfg(not(coverage))] +impl Drop for MjpegFrameEncoder { + fn drop(&mut self) { + let _ = self.src.end_of_stream(); + let _ = self.pipeline.set_state(gst::State::Null); + } +} + +#[cfg(not(coverage))] +fn solid_rgb_frame(width: usize, height: usize, color: Rgb) -> Vec { + let mut frame = vec![0u8; width.saturating_mul(height).saturating_mul(3)]; + for pixel in frame.chunks_exact_mut(3) { + pixel[0] = color.r; + pixel[1] = color.g; + pixel[2] = color.b; + } + frame +} + +fn render_audio_chunk( + config: &ProbeConfig, + chunk_pts: Duration, + samples_per_chunk: usize, +) -> Vec { + let sample_step = Duration::from_nanos(1_000_000_000u64 / u64::from(AUDIO_SAMPLE_RATE)); + let mut pcm = + Vec::with_capacity(samples_per_chunk * AUDIO_CHANNELS * std::mem::size_of::()); + for sample_index in 0..samples_per_chunk { + let sample_pts = chunk_pts + duration_mul(sample_step, sample_index as u64); + let sample = config + .event_code_at(sample_pts) + .and_then(event_frequency_hz) + .map(|frequency| { + let phase = TAU * frequency * sample_pts.as_secs_f64(); + (phase.sin() * AUDIO_AMPLITUDE) as i16 + }) + .unwrap_or(0); + for _ in 0..AUDIO_CHANNELS { + pcm.extend_from_slice(&sample.to_le_bytes()); + } + } + pcm +} + +fn event_frequency_hz(code: u32) -> Option { + EVENT_FREQUENCIES_HZ + .get(code.checked_sub(1)? as usize) + .copied() +} + +fn duration_us(duration: Duration) -> u64 { + duration.as_micros().min(u128::from(u64::MAX)) as u64 +} + +fn duration_mul(duration: Duration, count: u64) -> Duration { + let nanos = duration + .as_nanos() + .saturating_mul(u128::from(count)) + .min(u128::from(u64::MAX)); + Duration::from_nanos(nanos as u64) +} + +#[cfg(test)] +mod tests { + use super::{ProbeConfig, render_audio_chunk}; + use lesavka_common::lesavka::OutputDelayProbeRequest; + use std::time::Duration; + + #[test] + fn request_defaults_to_long_coded_server_probe() { + let config = + ProbeConfig::from_request(&OutputDelayProbeRequest::default()).expect("default config"); + + assert_eq!(config.duration, Duration::from_secs(20)); + assert_eq!(config.warmup, Duration::from_secs(4)); + assert_eq!(config.event_count(), 16); + assert_eq!(config.event_code_at(Duration::from_secs(4)), Some(1)); + assert_eq!(config.event_code_at(Duration::from_secs(5)), Some(2)); + } + + #[test] + fn event_codes_reject_unsupported_signatures() { + let request = OutputDelayProbeRequest { + event_width_codes: "1,5".to_string(), + ..Default::default() + }; + + assert!(ProbeConfig::from_request(&request).is_err()); + } + + #[test] + fn audio_chunk_contains_tone_only_during_coded_pulse() { + let config = ProbeConfig::from_request(&OutputDelayProbeRequest { + duration_seconds: 6, + warmup_seconds: 1, + pulse_period_ms: 1_000, + pulse_width_ms: 120, + event_width_codes: "3".to_string(), + }) + .expect("config"); + + let active = render_audio_chunk(&config, Duration::from_secs(1), 480); + let idle = render_audio_chunk(&config, Duration::from_millis(500), 480); + + assert!(active.iter().any(|byte| *byte != 0)); + assert!(idle.iter().all(|byte| *byte == 0)); + } +} diff --git a/testing/tests/client_manual_sync_script_contract.rs b/testing/tests/client_manual_sync_script_contract.rs index 781338f..7be3526 100644 --- a/testing/tests/client_manual_sync_script_contract.rs +++ b/testing/tests/client_manual_sync_script_contract.rs @@ -29,7 +29,7 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${SERVER_TUNNEL_REMOTE_PORT}", "CAPTURE_READY_MARKER=\"__LESAVKA_CAPTURE_READY__\"", "LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-/tmp}", - "LOCAL_REPORT_DIR=\"${LOCAL_OUTPUT_DIR%/}/lesavka-sync-probe-${STAMP}\"", + "LOCAL_REPORT_DIR=\"${LOCAL_OUTPUT_DIR%/}/lesavka-output-delay-probe-${STAMP}\"", "LOCAL_ANALYSIS_JSON=\"${LOCAL_REPORT_DIR}/report.json\"", "LOCAL_EVENTS_CSV=\"${LOCAL_REPORT_DIR}/events.csv\"", "LOCAL_OUTPUT_DELAY_JSON=\"${LOCAL_REPORT_DIR}/output-delay-calibration.json\"", @@ -49,14 +49,20 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "scope\": \"server-output-static-baseline\"", "applies_to\": \"server UVC/UAC gadget output path\"", "measurement_host_role\": \"lab-attached USB host\"", + "probe_media_origin\": \"server-generated\"", + "probe_media_path\": \"server generated signatures -> UVC/UAC sinks -> lab host capture\"", "audio_after_video_positive", + "PROBE_EVENT_WIDTH_CODES=${PROBE_EVENT_WIDTH_CODES:-1,2,1,3", + "REMOTE_PULSE_CAPTURE_TOOL=${REMOTE_PULSE_CAPTURE_TOOL:-gst}", "output_delay_calibration_json", "direct UVC/UAC output-delay calibration", "calibration-save-default", "LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0}", "PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))}", - "timeout --signal=INT \"${PROBE_TIMEOUT_SECONDS}\" \"${PROBE_BIN}\"", - "sync probe timed out after ${PROBE_TIMEOUT_SECONDS}s", + "output-delay-probe", + "server-generated UVC/UAC output-delay probe", + "server output-delay probe timed out after ${PROBE_TIMEOUT_SECONDS}s", + "--event-width-codes '${PROBE_EVENT_WIDTH_CODES}'", "VIDIOC_STREAMON.*Connection timed out", "the UVC host opened before MJPEG frames reached the gadget", "Tethys capture failed before the sync probe could start", @@ -72,6 +78,7 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "==> Lesavka versions under test", "lesavka-relayctl", "--bin lesavka-relayctl", + "--bin lesavka-sync-analyze", "client_revision=", "server_version=", "server_revision=", @@ -88,6 +95,16 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { ), "auto server resolution should not guess a public gRPC host when SSH is already required" ); + for forbidden in [ + "LOCAL_AUDIO_SANITY", + "run_local_audio_sanity.sh", + "lesavka-sync-probe", + ] { + assert!( + !SYNC_SCRIPT.contains(forbidden), + "direct UVC/UAC output-delay probe must not run workstation/client-side probe behavior: {forbidden}" + ); + } } #[test]