test: measure output smoothness continuity

This commit is contained in:
Brad Stein 2026-05-03 21:16:46 -03:00
parent 1ec6baa06c
commit cf56b6691e
8 changed files with 446 additions and 37 deletions

View File

@ -120,6 +120,9 @@ path.
- [x] Split freshness reporting into fixed sync delay, last-hop device - [x] Split freshness reporting into fixed sync delay, last-hop device
overhead, and total RC target event age so freshness work cannot hide overhead, and total RC target event age so freshness work cannot hide
the sync cost. the sync cost.
- [x] Add dense server-generated smoothness evidence on the normal UVC/UAC
path: per-frame video continuity watermark, quiet audio pilot, cadence
jitter, duplicate/missing frame estimates, and low-RMS audio gap counts.
- [ ] Keep UI/profile controls authoritative for UVC output profiles beyond - [ ] Keep UI/profile controls authoritative for UVC output profiles beyond
`640x480@20`; validate `1280x720@30` and `1920x1080@20/30` after sync is `640x480@20`; validate `1280x720@30` and `1920x1080@20/30` after sync is
locked. locked.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.19.9" version = "0.19.10"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.19.9" version = "0.19.10"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.19.9" version = "0.19.10"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.19.9" version = "0.19.10"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.19.9" version = "0.19.10"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -821,6 +821,7 @@ write_output_delay_correlation() {
"${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \
"${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" \
"${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" \
"${LOCAL_CAPTURE}" \
"${LOCAL_CAPTURE_LOG}" \ "${LOCAL_CAPTURE_LOG}" \
"${LOCAL_CLOCK_ALIGNMENT_JSON}" \ "${LOCAL_CLOCK_ALIGNMENT_JSON}" \
"${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS}" \ "${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS}" \
@ -830,6 +831,9 @@ import csv
import json import json
import math import math
import pathlib import pathlib
import statistics
import struct
import subprocess
import sys import sys
( (
@ -838,6 +842,7 @@ import sys
output_json_path, output_json_path,
output_csv_path, output_csv_path,
output_txt_path, output_txt_path,
capture_path,
capture_log_path, capture_log_path,
clock_alignment_path, clock_alignment_path,
max_freshness_age_raw, max_freshness_age_raw,
@ -879,6 +884,24 @@ def load_json_or_empty(path):
return {} return {}
def run_json(command, description):
try:
output = subprocess.check_output(command, stderr=subprocess.PIPE)
except Exception as error:
return {"available": False, "error": f"{description} failed: {error}"}
try:
return json.loads(output)
except Exception as error:
return {"available": False, "error": f"{description} JSON parse failed: {error}"}
def run_bytes(command, description):
try:
return subprocess.check_output(command, stderr=subprocess.PIPE)
except Exception as error:
raise RuntimeError(f"{description} failed: {error}") from error
def parse_capture_start_unix_ns(path): def parse_capture_start_unix_ns(path):
try: try:
lines = pathlib.Path(path).read_text(errors="replace").splitlines() lines = pathlib.Path(path).read_text(errors="replace").splitlines()
@ -905,6 +928,25 @@ def percentile(values, pct):
return values[lower] * (1.0 - fraction) + values[upper] * fraction return values[lower] * (1.0 - fraction) + values[upper] * fraction
def numeric_stats(values, suffix=""):
values = [value for value in values if value is not None and math.isfinite(value)]
if not values:
return {
"available": False,
"count": 0,
f"median{suffix}": None,
f"p95{suffix}": None,
f"max{suffix}": None,
}
return {
"available": True,
"count": len(values),
f"median{suffix}": percentile(values, 50.0),
f"p95{suffix}": percentile(values, 95.0),
f"max{suffix}": max(values),
}
def stats(rows, key): def stats(rows, key):
values = [row.get(key) for row in rows if row.get(key) is not None] values = [row.get(key) for row in rows if row.get(key) is not None]
values = [value for value in values if math.isfinite(value)] values = [value for value in values if math.isfinite(value)]
@ -994,6 +1036,294 @@ def correlation(rows, left_key, right_key):
return sum((x - mean_x) * (y - mean_y) for x, y in pairs) / math.sqrt(denom_x * denom_y) return sum((x - mean_x) * (y - mean_y) for x, y in pairs) / math.sqrt(denom_x * denom_y)
def ffprobe_frame_timestamps(path):
data = run_json(
[
"ffprobe",
"-hide_banner",
"-loglevel",
"error",
"-select_streams",
"v:0",
"-show_frames",
"-show_entries",
"frame=best_effort_timestamp_time",
"-of",
"json",
path,
],
"ffprobe video frames",
)
return [
float(frame["best_effort_timestamp_time"])
for frame in data.get("frames", [])
if frame.get("best_effort_timestamp_time") not in (None, "N/A")
]
def ffprobe_packet_times(path, stream):
data = run_json(
[
"ffprobe",
"-hide_banner",
"-loglevel",
"error",
"-select_streams",
stream,
"-show_packets",
"-show_entries",
"packet=pts_time,duration_time",
"-of",
"json",
path,
],
f"ffprobe {stream} packets",
)
packets = []
for packet in data.get("packets", []):
try:
pts = float(packet["pts_time"])
except Exception:
continue
duration = None
try:
duration = float(packet.get("duration_time"))
except Exception:
pass
packets.append((pts, duration))
return packets
def continuity_frame_ids(path):
width = 320
height = 32
blocks = 20
try:
raw = run_bytes(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-i",
path,
"-map",
"0:v:0",
"-vf",
f"crop=iw:32:0:ih-32,scale={width}:{height}:flags=area,format=gray",
"-f",
"rawvideo",
"-pix_fmt",
"gray",
"-",
],
"ffmpeg video continuity extraction",
)
except RuntimeError:
return []
frame_pixels = width * height
if not raw or len(raw) % frame_pixels != 0:
return []
block_width = width // blocks
ids = []
for offset in range(0, len(raw), frame_pixels):
frame = raw[offset : offset + frame_pixels]
averages = []
for block in range(blocks):
x_start = block * block_width
x_end = width if block + 1 == blocks else (block + 1) * block_width
total = 0
count = 0
for y in range(height):
row = y * width
for x in range(x_start, x_end):
total += frame[row + x]
count += 1
averages.append(total / max(1, count))
white = averages[0]
black = averages[1]
if white < 150 or black > 105 or white - black < 70:
ids.append(None)
continue
threshold = (white + black) / 2.0
value = 0
for block in range(2, 18):
value = (value << 1) | int(averages[block] > threshold)
parity = bool(averages[18] > threshold)
inverse = bool(averages[19] > threshold)
if parity == inverse or parity != bool(value.bit_count() & 1):
ids.append(None)
continue
ids.append(value)
return ids
def sequence_smoothness(ids):
decoded = [value for value in ids if value is not None]
duplicates = 0
missing = 0
regressions = 0
jumps = []
previous = None
for value in decoded:
if previous is None:
previous = value
continue
diff = (value - previous) & 0xFFFF
jumps.append(diff)
if diff == 0:
duplicates += 1
elif 1 < diff < 32768:
missing += diff - 1
elif diff >= 32768:
regressions += 1
previous = value
return {
"decoded_frames": len(decoded),
"undecodable_frames": len(ids) - len(decoded),
"unique_frames": len(set(decoded)),
"duplicate_frames": duplicates,
"estimated_missing_frames": missing,
"sequence_regressions": regressions,
"largest_forward_jump": max(jumps) if jumps else 0,
}
def interval_smoothness(timestamps, expected_ms, window_start_s=None, window_end_s=None):
if window_start_s is not None and window_end_s is not None:
timestamps = [ts for ts in timestamps if window_start_s <= ts <= window_end_s]
intervals = [
(right - left) * 1000.0
for left, right in zip(timestamps, timestamps[1:])
if right > left
]
if (expected_ms is None or expected_ms <= 0.0) and intervals:
expected_ms = statistics.median(intervals)
jitter = [abs(value - expected_ms) for value in intervals] if expected_ms else []
hiccup_threshold = expected_ms * 1.5 if expected_ms else float("inf")
short_threshold = expected_ms * 0.5 if expected_ms else -1.0
return {
"timestamps": len(timestamps),
"expected_interval_ms": expected_ms,
"interval_stats": numeric_stats(intervals, "_interval_ms"),
"jitter_stats": numeric_stats(jitter, "_jitter_ms"),
"hiccup_count": sum(1 for value in intervals if value > hiccup_threshold),
"short_interval_count": sum(1 for value in intervals if value < short_threshold),
"estimated_missing_by_timestamp": sum(
max(0, round(value / expected_ms) - 1)
for value in intervals
if expected_ms and value > hiccup_threshold
),
}
def audio_rms_smoothness(path, window_start_s, window_end_s):
sample_rate = 48_000
window_ms = 10
samples_per_window = sample_rate * window_ms // 1000
try:
raw = run_bytes(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-i",
path,
"-map",
"0:a:0",
"-ac",
"1",
"-ar",
str(sample_rate),
"-f",
"s16le",
"-acodec",
"pcm_s16le",
"-",
],
"ffmpeg audio continuity extraction",
)
except RuntimeError as error:
return {"available": False, "error": str(error)}
start_sample = max(0, int(window_start_s * sample_rate))
end_sample = min(len(raw) // 2, int(window_end_s * sample_rate))
if end_sample <= start_sample:
return {"available": False, "error": "empty audio smoothness window"}
sample_count = end_sample - start_sample
samples = struct.unpack(
f"<{sample_count}h",
raw[start_sample * 2 : end_sample * 2],
)
rms_values = []
for index in range(0, len(samples) - samples_per_window + 1, samples_per_window):
chunk = samples[index : index + samples_per_window]
rms_values.append(math.sqrt(sum(value * value for value in chunk) / len(chunk)))
low_threshold = 90.0
return {
"available": bool(rms_values),
"window_ms": window_ms,
"rms_stats": numeric_stats(rms_values, "_rms"),
"low_rms_window_count": sum(1 for value in rms_values if value < low_threshold),
"low_rms_threshold": low_threshold,
}
def analyze_smoothness(path, report, timeline):
camera_fps = as_float(timeline.get("camera_fps"), 0.0)
duration_s = as_float(timeline.get("duration_us"), 0.0) / 1_000_000.0
warmup_s = as_float(timeline.get("warmup_us"), 0.0) / 1_000_000.0
paired_times = [
value
for event in report.get("paired_events", [])
for value in [finite(event.get("video_time_s")), finite(event.get("audio_time_s"))]
if value is not None
]
if paired_times:
window_start_s = max(0.0, min(paired_times) - warmup_s)
window_end_s = window_start_s + duration_s
else:
window_start_s = None
window_end_s = None
video_timestamps = ffprobe_frame_timestamps(path)
audio_packets = ffprobe_packet_times(path, "a:0")
audio_packet_timestamps = [pts for pts, _duration in audio_packets]
video_ids = continuity_frame_ids(path)
if window_start_s is not None and window_end_s is not None and len(video_ids) == len(video_timestamps):
video_ids = [
frame_id
for frame_id, timestamp in zip(video_ids, video_timestamps)
if window_start_s <= timestamp <= window_end_s
]
expected_video_ms = 1000.0 / camera_fps if camera_fps > 0.0 else None
audio_durations = [
duration * 1000.0
for _pts, duration in audio_packets
if duration is not None and math.isfinite(duration) and duration > 0.0
]
expected_audio_ms = statistics.median(audio_durations) if audio_durations else None
audio_window_end = window_end_s if window_end_s is not None else max(audio_packet_timestamps or [0.0])
return {
"schema": "lesavka.output-smoothness-summary.v1",
"scope": "captured RC target media cadence and continuity over the server-generated probe window",
"window_start_s": window_start_s,
"window_end_s": window_end_s,
"video": {
**interval_smoothness(video_timestamps, expected_video_ms, window_start_s, window_end_s),
**sequence_smoothness(video_ids),
},
"audio": {
"packet_cadence": interval_smoothness(
audio_packet_timestamps,
expected_audio_ms,
window_start_s,
window_end_s,
),
"rms_continuity": audio_rms_smoothness(path, window_start_s or 0.0, audio_window_end),
},
}
capture_start_unix_ns = parse_capture_start_unix_ns(capture_log_path) capture_start_unix_ns = parse_capture_start_unix_ns(capture_log_path)
clock_alignment = load_json_or_empty(clock_alignment_path) clock_alignment = load_json_or_empty(clock_alignment_path)
clock_alignment_available = bool(clock_alignment.get("available")) clock_alignment_available = bool(clock_alignment.get("available"))
@ -1197,6 +1527,8 @@ else:
f"(limit {max_freshness_drift_ms:.1f} ms)" f"(limit {max_freshness_drift_ms:.1f} ms)"
) )
smoothness = analyze_smoothness(capture_path, report, timeline)
artifact = { artifact = {
"schema": "lesavka.output-delay-correlation.v1", "schema": "lesavka.output-delay-correlation.v1",
"report_json": report_path, "report_json": report_path,
@ -1233,6 +1565,7 @@ artifact = {
"video_event_age_model": video_event_age_model, "video_event_age_model": video_event_age_model,
"audio_event_age_model": audio_event_age_model, "audio_event_age_model": audio_event_age_model,
}, },
"smoothness": smoothness,
"server_observed_correlation": server_observed_correlation, "server_observed_correlation": server_observed_correlation,
"server_drift_share_of_observed": server_share, "server_drift_share_of_observed": server_share,
"dominant_layer": dominant_layer, "dominant_layer": dominant_layer,
@ -1299,11 +1632,18 @@ lines = [
f"- freshness status: {freshness_status} ({freshness_reason})", f"- freshness status: {freshness_status} ({freshness_reason})",
f"- clock uncertainty: +/-{clock_uncertainty_ms:.1f} ms", f"- clock uncertainty: +/-{clock_uncertainty_ms:.1f} ms",
f"- intentional sync delays: audio {audio_delay_ms:+.1f} ms, video {video_delay_ms:+.1f} ms", f"- intentional sync delays: audio {audio_delay_ms:+.1f} ms, video {video_delay_ms:+.1f} ms",
f"- device path overhead: video median {video_freshness_stats.get('median_ms') or 0.0:.1f} ms / p95 {video_freshness_stats.get('p95_ms') or 0.0:.1f} ms / max {video_freshness_stats.get('max_ms') or 0.0:.1f} ms; audio median {audio_freshness_stats.get('median_ms') or 0.0:.1f} ms / p95 {audio_freshness_stats.get('p95_ms') or 0.0:.1f} ms / max {audio_freshness_stats.get('max_ms') or 0.0:.1f} ms", f"- media timestamp path offset: video median {video_freshness_stats.get('median_ms') or 0.0:.1f} ms / p95 {video_freshness_stats.get('p95_ms') or 0.0:.1f} ms / max {video_freshness_stats.get('max_ms') or 0.0:.1f} ms; audio median {audio_freshness_stats.get('median_ms') or 0.0:.1f} ms / p95 {audio_freshness_stats.get('p95_ms') or 0.0:.1f} ms / max {audio_freshness_stats.get('max_ms') or 0.0:.1f} ms",
f"- RC target event age: video median {video_event_age_stats.get('median_ms') or 0.0:.1f} ms / p95 {video_event_age_stats.get('p95_ms') or 0.0:.1f} ms / max {video_event_age_stats.get('max_ms') or 0.0:.1f} ms; audio median {audio_event_age_stats.get('median_ms') or 0.0:.1f} ms / p95 {audio_event_age_stats.get('p95_ms') or 0.0:.1f} ms / max {audio_event_age_stats.get('max_ms') or 0.0:.1f} ms", f"- RC target event age: video median {video_event_age_stats.get('median_ms') or 0.0:.1f} ms / p95 {video_event_age_stats.get('p95_ms') or 0.0:.1f} ms / max {video_event_age_stats.get('max_ms') or 0.0:.1f} ms; audio median {audio_event_age_stats.get('median_ms') or 0.0:.1f} ms / p95 {audio_event_age_stats.get('p95_ms') or 0.0:.1f} ms / max {audio_event_age_stats.get('max_ms') or 0.0:.1f} ms",
f"- freshness budget: worst RC event-age p95 {(freshness_worst_event_p95_ms or 0.0):.1f} ms + clock uncertainty {clock_uncertainty_ms:.1f} ms = {(freshness_worst_event_with_uncertainty_ms or 0.0):.1f} ms vs limit {max_freshness_age_ms:.1f} ms", f"- freshness budget: worst RC event-age p95 {(freshness_worst_event_p95_ms or 0.0):.1f} ms + clock uncertainty {clock_uncertainty_ms:.1f} ms = {(freshness_worst_event_with_uncertainty_ms or 0.0):.1f} ms vs limit {max_freshness_age_ms:.1f} ms",
f"- video event-age drift: {video_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({video_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)", f"- video event-age drift: {video_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({video_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)",
f"- audio event-age drift: {audio_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({audio_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)", f"- audio event-age drift: {audio_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({audio_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)",
"",
"Output smoothness",
f"- window: {smoothness.get('window_start_s') or 0.0:.3f}s to {smoothness.get('window_end_s') or 0.0:.3f}s",
f"- video cadence: frames {smoothness.get('video', {}).get('timestamps', 0)}, expected interval {(smoothness.get('video', {}).get('expected_interval_ms') or 0.0):.1f} ms, p95 jitter {(smoothness.get('video', {}).get('jitter_stats', {}).get('p95_jitter_ms') or 0.0):.1f} ms, max interval {(smoothness.get('video', {}).get('interval_stats', {}).get('max_interval_ms') or 0.0):.1f} ms, hiccups {smoothness.get('video', {}).get('hiccup_count', 0)}",
f"- video continuity: decoded {smoothness.get('video', {}).get('decoded_frames', 0)}, duplicates {smoothness.get('video', {}).get('duplicate_frames', 0)}, estimated missing {smoothness.get('video', {}).get('estimated_missing_frames', 0)}, undecodable {smoothness.get('video', {}).get('undecodable_frames', 0)}",
f"- audio packet cadence: packets {smoothness.get('audio', {}).get('packet_cadence', {}).get('timestamps', 0)}, p95 jitter {(smoothness.get('audio', {}).get('packet_cadence', {}).get('jitter_stats', {}).get('p95_jitter_ms') or 0.0):.1f} ms, max interval {(smoothness.get('audio', {}).get('packet_cadence', {}).get('interval_stats', {}).get('max_interval_ms') or 0.0):.1f} ms, hiccups {smoothness.get('audio', {}).get('packet_cadence', {}).get('hiccup_count', 0)}",
f"- audio pilot continuity: low-RMS windows {smoothness.get('audio', {}).get('rms_continuity', {}).get('low_rms_window_count', 0)}, median RMS {(smoothness.get('audio', {}).get('rms_continuity', {}).get('rms_stats', {}).get('median_rms') or 0.0):.1f}",
] ]
summary = "\n".join(lines) + "\n" summary = "\n".join(lines) + "\n"
pathlib.Path(output_txt_path).write_text(summary) pathlib.Path(output_txt_path).write_text(summary)

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.19.9" version = "0.19.10"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -23,7 +23,11 @@ const AUDIO_SAMPLE_RATE: u32 = 48_000;
const AUDIO_CHANNELS: usize = 2; const AUDIO_CHANNELS: usize = 2;
const AUDIO_CHUNK_MS: u64 = 10; const AUDIO_CHUNK_MS: u64 = 10;
const AUDIO_AMPLITUDE: f64 = 24_000.0; const AUDIO_AMPLITUDE: f64 = 24_000.0;
const AUDIO_PILOT_AMPLITUDE: f64 = 700.0;
const AUDIO_PILOT_FREQUENCY_HZ: f64 = 180.0;
const DARK_FRAME_RGB: Rgb = Rgb { r: 4, g: 8, b: 12 }; const DARK_FRAME_RGB: Rgb = Rgb { r: 4, g: 8, b: 12 };
const VIDEO_CONTINUITY_BLOCKS: usize = 20;
const VIDEO_CONTINUITY_DATA_BITS: usize = 16;
const EVENT_COLORS: [Rgb; 4] = [ const EVENT_COLORS: [Rgb; 4] = [
Rgb { Rgb {
r: 255, r: 255,
@ -342,7 +346,7 @@ pub async fn run_server_output_delay_probe(
let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1)));
let audio_chunk = Duration::from_millis(AUDIO_CHUNK_MS); let audio_chunk = Duration::from_millis(AUDIO_CHUNK_MS);
let samples_per_chunk = ((u64::from(AUDIO_SAMPLE_RATE) * AUDIO_CHUNK_MS) / 1_000) as usize; let samples_per_chunk = ((u64::from(AUDIO_SAMPLE_RATE) * AUDIO_CHUNK_MS) / 1_000) as usize;
let frames = EncodedProbeFrames::new(camera)?; let frames = EncodedProbeFrames::new(camera, &config, frame_step)?;
let server_start_unix_ns = unix_ns_now(); let server_start_unix_ns = unix_ns_now();
let start = tokio::time::Instant::now(); let start = tokio::time::Instant::now();
let mut timeline = OutputDelayProbeTimeline::new(&config, camera, server_start_unix_ns); let mut timeline = OutputDelayProbeTimeline::new(&config, camera, server_start_unix_ns);
@ -402,12 +406,11 @@ pub async fn run_server_output_delay_probe(
if frame_active && next_frame_due <= next_audio_due { if frame_active && next_frame_due <= next_audio_due {
let pts_us = duration_us(next_frame_pts); let pts_us = duration_us(next_frame_pts);
let event_slot = config.event_slot_at(next_frame_pts); let event_slot = config.event_slot_at(next_frame_pts);
let code = event_slot.map(|slot| slot.code);
let seq = frame_index.saturating_add(1); let seq = frame_index.saturating_add(1);
relay.feed(VideoPacket { relay.feed(VideoPacket {
id: 0, id: 0,
pts: pts_us, pts: pts_us,
data: frames.packet_for_code(code)?.to_vec(), data: frames.packet_for_frame(frame_index)?.to_vec(),
seq, seq,
effective_fps: camera.fps, effective_fps: camera.fps,
client_capture_pts_us: pts_us, client_capture_pts_us: pts_us,
@ -463,13 +466,12 @@ pub async fn run_server_output_delay_probe(
#[cfg(not(coverage))] #[cfg(not(coverage))]
struct EncodedProbeFrames { struct EncodedProbeFrames {
dark: Vec<u8>, frames: Vec<Vec<u8>>,
events: [Vec<u8>; 4],
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
impl EncodedProbeFrames { impl EncodedProbeFrames {
fn new(camera: &CameraConfig) -> Result<Self> { fn new(camera: &CameraConfig, config: &ProbeConfig, frame_step: Duration) -> Result<Self> {
if !matches!(camera.codec, CameraCodec::Mjpeg) { if !matches!(camera.codec, CameraCodec::Mjpeg) {
bail!( bail!(
"server-generated output-delay probe currently requires MJPEG UVC output, got {}", "server-generated output-delay probe currently requires MJPEG UVC output, got {}",
@ -478,28 +480,33 @@ impl EncodedProbeFrames {
} }
let mut encoder = MjpegFrameEncoder::new(camera)?; let mut encoder = MjpegFrameEncoder::new(camera)?;
let dark = encoder.encode_solid(DARK_FRAME_RGB, 0)?; let mut frames = Vec::new();
let events = [ let mut frame_index = 0u64;
encoder.encode_solid(EVENT_COLORS[0], 1)?, loop {
encoder.encode_solid(EVENT_COLORS[1], 2)?, let pts = duration_mul(frame_step, frame_index);
encoder.encode_solid(EVENT_COLORS[2], 3)?, if pts > config.duration {
encoder.encode_solid(EVENT_COLORS[3], 4)?, break;
]; }
Ok(Self { dark, events }) let code = config.event_code_at(pts);
frames.push(encoder.encode_probe_frame(probe_color_for_code(code), frame_index)?);
frame_index = frame_index.saturating_add(1);
}
Ok(Self { frames })
} }
fn packet_for_code(&self, code: Option<u32>) -> Result<&[u8]> { fn packet_for_frame(&self, frame_index: u64) -> Result<&[u8]> {
let Some(code) = code else { self.frames
return Ok(&self.dark); .get(usize::try_from(frame_index).unwrap_or(usize::MAX))
};
let index = usize::try_from(code.saturating_sub(1)).unwrap_or(usize::MAX);
self.events
.get(index)
.map(Vec::as_slice) .map(Vec::as_slice)
.with_context(|| format!("unsupported event code {code}")) .with_context(|| format!("missing pre-encoded probe frame {frame_index}"))
} }
} }
fn probe_color_for_code(code: Option<u32>) -> Rgb {
code.and_then(|code| EVENT_COLORS.get(code.checked_sub(1)? as usize).copied())
.unwrap_or(DARK_FRAME_RGB)
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
struct MjpegFrameEncoder { struct MjpegFrameEncoder {
src: gst_app::AppSrc, src: gst_app::AppSrc,
@ -582,9 +589,9 @@ impl MjpegFrameEncoder {
}) })
} }
fn encode_solid(&mut self, color: Rgb, sequence: u64) -> Result<Vec<u8>> { fn encode_probe_frame(&mut self, color: Rgb, sequence: u64) -> Result<Vec<u8>> {
let pts_us = sequence.saturating_mul(self.frame_step_us); let pts_us = sequence.saturating_mul(self.frame_step_us);
let frame = solid_rgb_frame(self.width, self.height, color); let frame = probe_rgb_frame(self.width, self.height, color, sequence);
let mut buffer = gst::Buffer::from_slice(frame); let mut buffer = gst::Buffer::from_slice(frame);
if let Some(meta) = buffer.get_mut() { if let Some(meta) = buffer.get_mut() {
let pts = gst::ClockTime::from_useconds(pts_us); let pts = gst::ClockTime::from_useconds(pts_us);
@ -616,16 +623,57 @@ impl Drop for MjpegFrameEncoder {
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn solid_rgb_frame(width: usize, height: usize, color: Rgb) -> Vec<u8> { fn probe_rgb_frame(width: usize, height: usize, color: Rgb, sequence: u64) -> Vec<u8> {
let mut frame = vec![0u8; width.saturating_mul(height).saturating_mul(3)]; let mut frame = vec![0u8; width.saturating_mul(height).saturating_mul(3)];
for pixel in frame.chunks_exact_mut(3) { for pixel in frame.chunks_exact_mut(3) {
pixel[0] = color.r; pixel[0] = color.r;
pixel[1] = color.g; pixel[1] = color.g;
pixel[2] = color.b; pixel[2] = color.b;
} }
draw_frame_continuity_watermark(&mut frame, width, height, sequence);
frame frame
} }
fn draw_frame_continuity_watermark(frame: &mut [u8], width: usize, height: usize, sequence: u64) {
if width < VIDEO_CONTINUITY_BLOCKS || height < 8 {
return;
}
let stripe_height = (height / 18).clamp(8, 48);
let stripe_top = height.saturating_sub(stripe_height);
let block_width = (width / VIDEO_CONTINUITY_BLOCKS).max(1);
let seq = (sequence & 0xffff) as u16;
let parity = (seq.count_ones() & 1) != 0;
for block in 0..VIDEO_CONTINUITY_BLOCKS {
let white = match block {
0 => true,
1 => false,
2..=17 => {
let bit = VIDEO_CONTINUITY_DATA_BITS - 1 - (block - 2);
((seq >> bit) & 1) != 0
}
18 => parity,
_ => !parity,
};
let value = if white { 255 } else { 0 };
let x_start = block * block_width;
let x_end = if block + 1 == VIDEO_CONTINUITY_BLOCKS {
width
} else {
((block + 1) * block_width).min(width)
};
for y in stripe_top..height {
for x in x_start..x_end {
let offset = (y * width + x) * 3;
if let Some(pixel) = frame.get_mut(offset..offset + 3) {
pixel[0] = value;
pixel[1] = value;
pixel[2] = value;
}
}
}
}
}
fn render_audio_chunk( fn render_audio_chunk(
config: &ProbeConfig, config: &ProbeConfig,
chunk_pts: Duration, chunk_pts: Duration,
@ -636,14 +684,17 @@ fn render_audio_chunk(
Vec::with_capacity(samples_per_chunk * AUDIO_CHANNELS * std::mem::size_of::<i16>()); Vec::with_capacity(samples_per_chunk * AUDIO_CHANNELS * std::mem::size_of::<i16>());
for sample_index in 0..samples_per_chunk { for sample_index in 0..samples_per_chunk {
let sample_pts = chunk_pts + duration_mul(sample_step, sample_index as u64); let sample_pts = chunk_pts + duration_mul(sample_step, sample_index as u64);
let sample = config let pilot_phase = TAU * AUDIO_PILOT_FREQUENCY_HZ * sample_pts.as_secs_f64();
let pilot = pilot_phase.sin() * AUDIO_PILOT_AMPLITUDE;
let event = config
.event_code_at(sample_pts) .event_code_at(sample_pts)
.and_then(event_frequency_hz) .and_then(event_frequency_hz)
.map(|frequency| { .map(|frequency| {
let phase = TAU * frequency * sample_pts.as_secs_f64(); let phase = TAU * frequency * sample_pts.as_secs_f64();
(phase.sin() * AUDIO_AMPLITUDE) as i16 phase.sin() * AUDIO_AMPLITUDE
}) })
.unwrap_or(0); .unwrap_or(0.0);
let sample = (pilot + event).clamp(f64::from(i16::MIN), f64::from(i16::MAX)) as i16;
for _ in 0..AUDIO_CHANNELS { for _ in 0..AUDIO_CHANNELS {
pcm.extend_from_slice(&sample.to_le_bytes()); pcm.extend_from_slice(&sample.to_le_bytes());
} }
@ -732,7 +783,8 @@ mod tests {
let idle = render_audio_chunk(&config, Duration::from_millis(500), 480); let idle = render_audio_chunk(&config, Duration::from_millis(500), 480);
assert!(active.iter().any(|byte| *byte != 0)); assert!(active.iter().any(|byte| *byte != 0));
assert!(idle.iter().all(|byte| *byte == 0)); assert!(idle.iter().any(|byte| *byte != 0));
assert!(rms_i16_le(&active) > rms_i16_le(&idle) * 10.0);
} }
#[test] #[test]
@ -792,4 +844,14 @@ mod tests {
Some(0.5) Some(0.5)
); );
} }
fn rms_i16_le(bytes: &[u8]) -> f64 {
let samples = bytes
.chunks_exact(2)
.map(|chunk| f64::from(i16::from_le_bytes([chunk[0], chunk[1]])))
.collect::<Vec<_>>();
let mean_square =
samples.iter().map(|sample| sample * sample).sum::<f64>() / samples.len().max(1) as f64;
mean_square.sqrt()
}
} }

View File

@ -80,9 +80,13 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() {
"freshness status", "freshness status",
"clock-corrected server feed to Tethys capture event", "clock-corrected server feed to Tethys capture event",
"intentional sync delays", "intentional sync delays",
"device path overhead", "media timestamp path offset",
"RC target event age", "RC target event age",
"freshness budget", "freshness budget",
"Output smoothness",
"video continuity",
"audio pilot continuity",
"schema\": \"lesavka.output-smoothness-summary.v1\"",
"video_freshness_ms", "video_freshness_ms",
"audio_freshness_ms", "audio_freshness_ms",
"video_event_age_ms", "video_event_age_ms",