diff --git a/AGENTS.md b/AGENTS.md index 783cf22..8e6fa6e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -103,6 +103,8 @@ path. freshness can be tightened without changing the bundled sync architecture. - [x] Make direct UVC/UAC output-delay application absolute by default so stale legacy calibration does not keep a hidden multi-second video delay alive. +- [x] Make the direct output probe report separate sync and clock-corrected + freshness verdicts from the same paired server-generated signatures. - [x] Continue reporting client timing and sink handoff diagnostics from bundled packets. - [ ] Add bundled-mode counters for first bundle, first audio push, first video feed, dropped stale bundles, and bundle queue age. diff --git a/Cargo.lock b/Cargo.lock index 009a3bd..5909b36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.19.5" +version = "0.19.6" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.19.5" +version = "0.19.6" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.19.5" +version = "0.19.6" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 1ef687b..b0b4d8b 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.19.5" +version = "0.19.6" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 7b6b4fd..2fa48d1 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.19.5" +version = "0.19.6" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 6f7ecbb..1492301 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -185,6 +185,8 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_OUTPUT_DELAY_MIN_PAIRS` | manual direct UVC/UAC probe evidence floor before applying measured output-delay calibration, defaults to `8` | | `LESAVKA_OUTPUT_DELAY_SAVE` | manual direct UVC/UAC probe override; after applying a ready measured correction, persist it as the server default calibration | | `LESAVKA_OUTPUT_DELAY_TARGET` | manual direct UVC/UAC probe override; choose whether measured skew is corrected by shifting `video` or `audio`, defaults to `video` | +| `LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS` | manual direct UVC/UAC probe freshness gate; maximum clock-corrected server-feed-to-Tethys-observed p95 age, defaults to `1000` | +| `LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS` | manual direct UVC/UAC probe freshness gate; maximum allowed freshness drift across paired probe events, defaults to `100` | | `LESAVKA_PASTE_DELAY_MS` | input routing/clipboard override | | `LESAVKA_PASTE_KEY` | input routing/clipboard override | | `LESAVKA_PASTE_KEY_FILE` | input routing/clipboard override | diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 9a7efc7..51e1c44 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -63,6 +63,8 @@ LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-500 LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80} LESAVKA_OUTPUT_DELAY_GAIN=${LESAVKA_OUTPUT_DELAY_GAIN:-1.0} LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000} +LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS:-1000} +LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS:-100} CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__" STAMP="$(date +%Y%m%d-%H%M%S)" @@ -77,6 +79,7 @@ LOCAL_SERVER_TIMELINE_JSON="${LOCAL_REPORT_DIR}/server-output-timeline.json" LOCAL_OUTPUT_DELAY_CORRELATION_JSON="${LOCAL_REPORT_DIR}/output-delay-correlation.json" LOCAL_OUTPUT_DELAY_CORRELATION_CSV="${LOCAL_REPORT_DIR}/output-delay-correlation.csv" LOCAL_OUTPUT_DELAY_CORRELATION_TXT="${LOCAL_REPORT_DIR}/output-delay-correlation.txt" +LOCAL_CLOCK_ALIGNMENT_JSON="${LOCAL_REPORT_DIR}/clock-alignment.json" LOCAL_OUTPUT_DELAY_JSON="${LOCAL_REPORT_DIR}/output-delay-calibration.json" LOCAL_OUTPUT_DELAY_ENV="${LOCAL_REPORT_DIR}/output-delay-calibration.env" LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log" @@ -108,6 +111,72 @@ with socket.socket() as sock: PY } +sample_host_clock_offset_ns() { + local host=$1 + local before_ns remote_ns after_ns + before_ns="$(date +%s%N)" + remote_ns="$(ssh ${SSH_OPTS} "${host}" 'date +%s%N')" + after_ns="$(date +%s%N)" + python3 - <<'PY' "${before_ns}" "${remote_ns}" "${after_ns}" +import sys + +before_ns, remote_ns, after_ns = (int(value) for value in sys.argv[1:]) +local_mid_ns = (before_ns + after_ns) // 2 +print(f"{remote_ns - local_mid_ns} {(after_ns - before_ns) // 2} {after_ns - before_ns}") +PY +} + +write_clock_alignment() { + echo "==> sampling Theia/Tethys clock alignment for freshness" + local theia_sample tethys_sample + if ! theia_sample="$(sample_host_clock_offset_ns "${LESAVKA_SERVER_HOST}")"; then + echo " ↪ clock alignment unavailable: failed to sample ${LESAVKA_SERVER_HOST}" + printf '{"schema":"lesavka.clock-alignment.v1","available":false,"reason":"failed to sample server host"}\n' >"${LOCAL_CLOCK_ALIGNMENT_JSON}" + return 0 + fi + if ! tethys_sample="$(sample_host_clock_offset_ns "${TETHYS_HOST}")"; then + echo " ↪ clock alignment unavailable: failed to sample ${TETHYS_HOST}" + printf '{"schema":"lesavka.clock-alignment.v1","available":false,"reason":"failed to sample capture host"}\n' >"${LOCAL_CLOCK_ALIGNMENT_JSON}" + return 0 + fi + + python3 - <<'PY' \ + "${LESAVKA_SERVER_HOST}" \ + "${TETHYS_HOST}" \ + "${theia_sample}" \ + "${tethys_sample}" \ + "${LOCAL_CLOCK_ALIGNMENT_JSON}" +import json +import pathlib +import sys + +server_host, capture_host, server_sample, capture_sample, output_path = sys.argv[1:] +server_offset_ns, server_uncertainty_ns, server_rtt_ns = (int(value) for value in server_sample.split()) +capture_offset_ns, capture_uncertainty_ns, capture_rtt_ns = (int(value) for value in capture_sample.split()) +theia_to_tethys_offset_ns = capture_offset_ns - server_offset_ns +uncertainty_ns = server_uncertainty_ns + capture_uncertainty_ns +artifact = { + "schema": "lesavka.clock-alignment.v1", + "available": True, + "method": "ssh remote date midpoint", + "server_host": server_host, + "capture_host": capture_host, + "server_clock_offset_from_local_ns": server_offset_ns, + "capture_clock_offset_from_local_ns": capture_offset_ns, + "theia_to_tethys_offset_ns": theia_to_tethys_offset_ns, + "uncertainty_ns": uncertainty_ns, + "uncertainty_ms": uncertainty_ns / 1_000_000.0, + "server_sample_rtt_ns": server_rtt_ns, + "capture_sample_rtt_ns": capture_rtt_ns, +} +pathlib.Path(output_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") +print( + f" ↪ theia_to_tethys_offset_ms={theia_to_tethys_offset_ns / 1_000_000.0:+.3f}" +) +print(f" ↪ clock_alignment_uncertainty_ms={uncertainty_ns / 1_000_000.0:.3f}") +PY +} + wait_for_server_tunnel() { local local_port=$1 local tries=50 @@ -471,14 +540,28 @@ write_output_delay_correlation() { "${LOCAL_SERVER_TIMELINE_JSON}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" \ - "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" + "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" \ + "${LOCAL_CAPTURE_LOG}" \ + "${LOCAL_CLOCK_ALIGNMENT_JSON}" \ + "${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS}" \ + "${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS}" import csv import json import math import pathlib import sys -report_path, timeline_path, output_json_path, output_csv_path, output_txt_path = sys.argv[1:] +( + report_path, + timeline_path, + output_json_path, + output_csv_path, + output_txt_path, + capture_log_path, + clock_alignment_path, + max_freshness_age_raw, + max_freshness_drift_raw, +) = sys.argv[1:] report = json.loads(pathlib.Path(report_path).read_text()) timeline = json.loads(pathlib.Path(timeline_path).read_text()) server_events = {int(event["event_id"]): event for event in timeline.get("events", [])} @@ -492,6 +575,80 @@ def finite(value): return result if math.isfinite(result) else None +def as_float(value, default): + try: + result = float(str(value).strip()) + except Exception: + return default + return result if math.isfinite(result) else default + + +def as_int_or_none(value): + try: + return int(str(value).strip()) + except Exception: + return None + + +def load_json_or_empty(path): + try: + return json.loads(pathlib.Path(path).read_text()) + except Exception: + return {} + + +def parse_capture_start_unix_ns(path): + try: + lines = pathlib.Path(path).read_text(errors="replace").splitlines() + except Exception: + return None + for line in lines: + if line.startswith("capture_start_unix_ns="): + return as_int_or_none(line.split("=", 1)[1]) + return None + + +def percentile(values, pct): + values = sorted(value for value in values if value is not None and math.isfinite(value)) + if not values: + return None + if len(values) == 1: + return values[0] + rank = (len(values) - 1) * (pct / 100.0) + lower = math.floor(rank) + upper = math.ceil(rank) + if lower == upper: + return values[int(rank)] + fraction = rank - lower + return values[lower] * (1.0 - fraction) + values[upper] * fraction + + +def stats(rows, key): + values = [row.get(key) for row in rows if row.get(key) is not None] + values = [value for value in values if math.isfinite(value)] + if not values: + return { + "available": False, + "count": 0, + "first_ms": None, + "last_ms": None, + "mean_ms": None, + "median_ms": None, + "p95_ms": None, + "max_ms": None, + } + return { + "available": True, + "count": len(values), + "first_ms": values[0], + "last_ms": values[-1], + "mean_ms": sum(values) / len(values), + "median_ms": percentile(values, 50.0), + "p95_ms": percentile(values, 95.0), + "max_ms": max(values), + } + + def fit_linear(rows, key): points = [(row["event_time_s"], row[key]) for row in rows if row.get(key) is not None] if len(points) < 2: @@ -544,6 +701,16 @@ def correlation(rows, left_key, right_key): return sum((x - mean_x) * (y - mean_y) for x, y in pairs) / math.sqrt(denom_x * denom_y) +capture_start_unix_ns = parse_capture_start_unix_ns(capture_log_path) +clock_alignment = load_json_or_empty(clock_alignment_path) +clock_alignment_available = bool(clock_alignment.get("available")) +theia_to_tethys_offset_ns = ( + as_int_or_none(clock_alignment.get("theia_to_tethys_offset_ns")) + if clock_alignment_available + else None +) +clock_uncertainty_ms = as_float(clock_alignment.get("uncertainty_ms"), 0.0) + joined = [] for observed in report.get("paired_events", []): event_id = int(observed.get("event_id", -1)) @@ -552,6 +719,46 @@ for observed in report.get("paired_events", []): continue observed_skew_ms = finite(observed.get("skew_ms")) server_feed_delta_ms = finite(server.get("server_feed_delta_ms")) + tethys_video_time_s = finite(observed.get("video_time_s")) + tethys_audio_time_s = finite(observed.get("audio_time_s")) + server_video_feed_s = finite(server.get("video_feed_monotonic_us")) + server_audio_push_s = finite(server.get("audio_push_monotonic_us")) + if server_video_feed_s is not None: + server_video_feed_s /= 1_000_000.0 + if server_audio_push_s is not None: + server_audio_push_s /= 1_000_000.0 + server_video_feed_unix_ns = as_int_or_none(server.get("video_feed_unix_ns")) + server_audio_push_unix_ns = as_int_or_none(server.get("audio_push_unix_ns")) + tethys_video_unix_ns = ( + capture_start_unix_ns + int(round(tethys_video_time_s * 1_000_000_000.0)) + if capture_start_unix_ns is not None and tethys_video_time_s is not None + else None + ) + tethys_audio_unix_ns = ( + capture_start_unix_ns + int(round(tethys_audio_time_s * 1_000_000_000.0)) + if capture_start_unix_ns is not None and tethys_audio_time_s is not None + else None + ) + corrected_server_video_feed_unix_ns = ( + server_video_feed_unix_ns + theia_to_tethys_offset_ns + if server_video_feed_unix_ns is not None and theia_to_tethys_offset_ns is not None + else None + ) + corrected_server_audio_push_unix_ns = ( + server_audio_push_unix_ns + theia_to_tethys_offset_ns + if server_audio_push_unix_ns is not None and theia_to_tethys_offset_ns is not None + else None + ) + video_freshness_ms = ( + (tethys_video_unix_ns - corrected_server_video_feed_unix_ns) / 1_000_000.0 + if tethys_video_unix_ns is not None and corrected_server_video_feed_unix_ns is not None + else None + ) + audio_freshness_ms = ( + (tethys_audio_unix_ns - corrected_server_audio_push_unix_ns) / 1_000_000.0 + if tethys_audio_unix_ns is not None and corrected_server_audio_push_unix_ns is not None + else None + ) residual_path_skew_ms = ( observed_skew_ms - server_feed_delta_ms if observed_skew_ms is not None and server_feed_delta_ms is not None @@ -564,11 +771,19 @@ for observed in report.get("paired_events", []): "event_time_s": planned_start_us / 1_000_000.0, "planned_start_us": planned_start_us, "planned_end_us": int(server.get("planned_end_us", 0)), - "tethys_video_time_s": finite(observed.get("video_time_s")), - "tethys_audio_time_s": finite(observed.get("audio_time_s")), + "tethys_video_time_s": tethys_video_time_s, + "tethys_audio_time_s": tethys_audio_time_s, "observed_skew_ms": observed_skew_ms, "server_video_feed_monotonic_us": server.get("video_feed_monotonic_us"), "server_audio_push_monotonic_us": server.get("audio_push_monotonic_us"), + "server_video_feed_unix_ns": server_video_feed_unix_ns, + "server_audio_push_unix_ns": server_audio_push_unix_ns, + "tethys_video_unix_ns": tethys_video_unix_ns, + "tethys_audio_unix_ns": tethys_audio_unix_ns, + "server_video_feed_s": server_video_feed_s, + "server_audio_push_s": server_audio_push_s, + "video_freshness_ms": video_freshness_ms, + "audio_freshness_ms": audio_freshness_ms, "server_feed_delta_ms": server_feed_delta_ms, "residual_path_skew_ms": residual_path_skew_ms, "confidence": finite(observed.get("confidence")), @@ -582,6 +797,10 @@ for row in joined: observed_model = fit_linear(joined, "observed_skew_ms") server_model = fit_linear(joined, "server_feed_delta_ms") residual_model = fit_linear(joined, "residual_path_skew_ms") +video_freshness_model = fit_linear(joined, "video_freshness_ms") +audio_freshness_model = fit_linear(joined, "audio_freshness_ms") +video_freshness_stats = stats(joined, "video_freshness_ms") +audio_freshness_stats = stats(joined, "audio_freshness_ms") server_observed_correlation = correlation(joined, "server_feed_delta_ms", "observed_skew_ms") observed_drift = observed_model.get("drift_ms", 0.0) @@ -600,6 +819,48 @@ correction_mode = ( else "scalar_candidate" ) +max_freshness_age_ms = max(1.0, as_float(max_freshness_age_raw, 1000.0)) +max_freshness_drift_ms = max(0.0, as_float(max_freshness_drift_raw, 100.0)) +freshness_p95_values = [ + value + for value in [ + video_freshness_stats.get("p95_ms"), + audio_freshness_stats.get("p95_ms"), + ] + if value is not None +] +freshness_drift_values = [ + abs(value) + for value in [ + video_freshness_model.get("drift_ms"), + audio_freshness_model.get("drift_ms"), + ] + if value is not None and math.isfinite(value) +] +freshness_worst_p95_ms = max(freshness_p95_values) if freshness_p95_values else None +freshness_worst_drift_ms = max(freshness_drift_values) if freshness_drift_values else None +if not freshness_p95_values: + freshness_status = "unknown" + freshness_reason = "clock-aligned server feed and Tethys capture timestamps were not available" +elif freshness_worst_p95_ms <= max_freshness_age_ms and ( + freshness_worst_drift_ms is None or freshness_worst_drift_ms <= max_freshness_drift_ms +): + freshness_status = "pass" + freshness_reason = ( + f"worst p95 freshness {freshness_worst_p95_ms:.1f} ms <= " + f"{max_freshness_age_ms:.1f} ms and worst freshness drift " + f"{(freshness_worst_drift_ms or 0.0):.1f} ms <= {max_freshness_drift_ms:.1f} ms" + ) +else: + freshness_status = "fail" + freshness_reason = ( + f"worst p95 freshness " + f"{freshness_worst_p95_ms if freshness_worst_p95_ms is not None else 0.0:.1f} ms " + f"(limit {max_freshness_age_ms:.1f} ms), worst freshness drift " + f"{freshness_worst_drift_ms if freshness_worst_drift_ms is not None else 0.0:.1f} ms " + f"(limit {max_freshness_drift_ms:.1f} ms)" + ) + artifact = { "schema": "lesavka.output-delay-correlation.v1", "report_json": report_path, @@ -609,6 +870,23 @@ artifact = { "observed_skew_model": observed_model, "server_feed_delta_model": server_model, "residual_path_skew_model": residual_model, + "freshness": { + "schema": "lesavka.output-freshness-summary.v1", + "status": freshness_status, + "reason": freshness_reason, + "scope": "clock-corrected server feed to Tethys capture event from the same paired signatures", + "capture_start_unix_ns": capture_start_unix_ns, + "clock_alignment": clock_alignment, + "clock_uncertainty_ms": clock_uncertainty_ms, + "max_age_limit_ms": max_freshness_age_ms, + "max_drift_limit_ms": max_freshness_drift_ms, + "worst_p95_freshness_ms": freshness_worst_p95_ms, + "worst_freshness_drift_ms": freshness_worst_drift_ms, + "video_freshness_stats": video_freshness_stats, + "audio_freshness_stats": audio_freshness_stats, + "video_freshness_model": video_freshness_model, + "audio_freshness_model": audio_freshness_model, + }, "server_observed_correlation": server_observed_correlation, "server_drift_share_of_observed": server_share, "dominant_layer": dominant_layer, @@ -638,6 +916,14 @@ with pathlib.Path(output_csv_path).open("w", newline="", encoding="utf-8") as ha "tethys_video_time_s", "tethys_audio_time_s", "observed_skew_ms", + "server_video_feed_s", + "server_audio_push_s", + "server_video_feed_unix_ns", + "server_audio_push_unix_ns", + "tethys_video_unix_ns", + "tethys_audio_unix_ns", + "video_freshness_ms", + "audio_freshness_ms", "server_feed_delta_ms", "residual_path_skew_ms", "server_video_feed_monotonic_us", @@ -659,6 +945,15 @@ lines = [ f"- residual path model: {residual_model.get('intercept_ms', 0.0):+.3f} ms + {residual_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", f"- server/observed correlation: {server_observed_correlation:+.3f}", f"- server drift share of observed: {server_share:.3f}", + "", + f"Output freshness for {report_path}", + "- scope: clock-corrected server feed to Tethys capture event from the same paired signatures", + f"- freshness status: {freshness_status} ({freshness_reason})", + f"- clock uncertainty: +/-{clock_uncertainty_ms:.1f} ms", + f"- video freshness: first {video_freshness_stats.get('first_ms') or 0.0:.1f} ms, median {video_freshness_stats.get('median_ms') or 0.0:.1f} ms, p95 {video_freshness_stats.get('p95_ms') or 0.0:.1f} ms, max {video_freshness_stats.get('max_ms') or 0.0:.1f} ms", + f"- audio freshness: first {audio_freshness_stats.get('first_ms') or 0.0:.1f} ms, median {audio_freshness_stats.get('median_ms') or 0.0:.1f} ms, p95 {audio_freshness_stats.get('p95_ms') or 0.0:.1f} ms, max {audio_freshness_stats.get('max_ms') or 0.0:.1f} ms", + f"- video freshness drift: {video_freshness_model.get('drift_ms', 0.0):+.1f} ms over paired events ({video_freshness_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)", + f"- audio freshness drift: {audio_freshness_model.get('drift_ms', 0.0):+.1f} ms over paired events ({audio_freshness_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)", ] summary = "\n".join(lines) + "\n" pathlib.Path(output_txt_path).write_text(summary) @@ -773,6 +1068,7 @@ fi print_lesavka_versions preflight_server_path +write_clock_alignment echo "==> starting Tethys capture on ${TETHYS_HOST}" ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ @@ -1179,6 +1475,8 @@ if [[ "${capture_mode}" == "alsa" && "${quiesce_for_alsa}" == "1" ]]; then trap restore_user_audio EXIT fi +printf 'capture_start_unix_ns=%s\n' "$(date +%s%N)" >&2 + if [[ "${capture_mode}" == "pwpipe" ]]; then printf 'using PipeWire-native mux capture target serial: %s\n' "${pw_audio_target}" >&2 timeout "${capture_seconds}" pw-record \ @@ -1519,6 +1817,9 @@ fi if [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]]; then echo "server_timeline_json: ${LOCAL_SERVER_TIMELINE_JSON}" fi +if [[ -f "${LOCAL_CLOCK_ALIGNMENT_JSON}" ]]; then + echo "clock_alignment_json: ${LOCAL_CLOCK_ALIGNMENT_JSON}" +fi if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" ]]; then echo "output_delay_correlation_json: ${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" fi diff --git a/server/Cargo.toml b/server/Cargo.toml index 377a8ac..eb7e38f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.19.5" +version = "0.19.6" edition = "2024" autobins = false diff --git a/server/src/output_delay_probe.rs b/server/src/output_delay_probe.rs index c2d22cd..5102ba2 100644 --- a/server/src/output_delay_probe.rs +++ b/server/src/output_delay_probe.rs @@ -6,7 +6,7 @@ use lesavka_common::lesavka::{AudioPacket, OutputDelayProbeRequest, VideoPacket} use serde::Serialize; use std::f64::consts::TAU; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::audio::Voice; use crate::camera::{CameraCodec, CameraConfig}; @@ -95,6 +95,7 @@ struct OutputDelayProbeTimeline { audio_chunk_ms: u64, audio_delay_us: u64, video_delay_us: u64, + server_start_unix_ns: u128, pulse_period_ms: u64, pulse_width_ms: u64, warmup_us: u64, @@ -112,11 +113,13 @@ struct OutputDelayProbeEventTimeline { audio_seq: Option, video_feed_monotonic_us: Option, audio_push_monotonic_us: Option, + video_feed_unix_ns: Option, + audio_push_unix_ns: Option, server_feed_delta_ms: Option, } impl OutputDelayProbeTimeline { - fn new(config: &ProbeConfig, camera: &CameraConfig) -> Self { + fn new(config: &ProbeConfig, camera: &CameraConfig, server_start_unix_ns: u128) -> Self { let event_count = config.event_count(); let events = (0..event_count) .map(|event_id| { @@ -130,6 +133,8 @@ impl OutputDelayProbeTimeline { audio_seq: None, video_feed_monotonic_us: None, audio_push_monotonic_us: None, + video_feed_unix_ns: None, + audio_push_unix_ns: None, server_feed_delta_ms: None, } }) @@ -146,6 +151,7 @@ impl OutputDelayProbeTimeline { audio_chunk_ms: AUDIO_CHUNK_MS, audio_delay_us: duration_us(config.audio_delay), video_delay_us: duration_us(config.video_delay), + server_start_unix_ns, pulse_period_ms: config.pulse_period.as_millis() as u64, pulse_width_ms: config.pulse_width.as_millis() as u64, warmup_us: duration_us(config.warmup), @@ -154,24 +160,26 @@ impl OutputDelayProbeTimeline { } } - fn mark_audio(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64) { + fn mark_audio(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64, unix_ns: u128) { let Some(event) = self.events.get_mut(slot.event_id) else { return; }; if event.audio_push_monotonic_us.is_none() { event.audio_seq = Some(seq); event.audio_push_monotonic_us = Some(monotonic_us); + event.audio_push_unix_ns = Some(unix_ns); event.update_delta(); } } - fn mark_video(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64) { + fn mark_video(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64, unix_ns: u128) { let Some(event) = self.events.get_mut(slot.event_id) else { return; }; if event.video_feed_monotonic_us.is_none() { event.video_seq = Some(seq); event.video_feed_monotonic_us = Some(monotonic_us); + event.video_feed_unix_ns = Some(unix_ns); event.update_delta(); } } @@ -335,8 +343,9 @@ pub async fn run_server_output_delay_probe( let audio_chunk = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = ((u64::from(AUDIO_SAMPLE_RATE) * AUDIO_CHUNK_MS) / 1_000) as usize; let frames = EncodedProbeFrames::new(camera)?; + let server_start_unix_ns = unix_ns_now(); let start = tokio::time::Instant::now(); - let mut timeline = OutputDelayProbeTimeline::new(&config, camera); + let mut timeline = OutputDelayProbeTimeline::new(&config, camera, server_start_unix_ns); let mut frame_index = 0u64; let mut audio_index = 0u64; let mut video_frames = 0u64; @@ -378,7 +387,13 @@ pub async fn run_server_output_delay_probe( client_queue_age_ms: 0, }); if let Some(slot) = event_slot { - timeline.mark_audio(slot, seq, monotonic_us_since(start)); + let monotonic_us = monotonic_us_since(start); + timeline.mark_audio( + slot, + seq, + monotonic_us, + unix_ns_from_start(server_start_unix_ns, monotonic_us), + ); } audio_packets = audio_packets.saturating_add(1); audio_index = audio_index.saturating_add(1); @@ -402,7 +417,13 @@ pub async fn run_server_output_delay_probe( ..Default::default() }); if let Some(slot) = event_slot { - timeline.mark_video(slot, seq, monotonic_us_since(start)); + let monotonic_us = monotonic_us_since(start); + timeline.mark_video( + slot, + seq, + monotonic_us, + unix_ns_from_start(server_start_unix_ns, monotonic_us), + ); } video_frames = video_frames.saturating_add(1); frame_index = frame_index.saturating_add(1); @@ -431,8 +452,12 @@ pub async fn run_server_output_delay_probe( video_frames: 1, audio_packets: 1, event_count: config.event_count(), - timeline_json: serde_json::to_string(&OutputDelayProbeTimeline::new(&config, camera)) - .unwrap_or_else(|_| "{}".to_string()), + timeline_json: serde_json::to_string(&OutputDelayProbeTimeline::new( + &config, + camera, + unix_ns_now(), + )) + .unwrap_or_else(|_| "{}".to_string()), }) } @@ -636,6 +661,17 @@ fn duration_us(duration: Duration) -> u64 { duration.as_micros().min(u128::from(u64::MAX)) as u64 } +fn unix_ns_now() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() +} + +fn unix_ns_from_start(server_start_unix_ns: u128, monotonic_us: u64) -> u128 { + server_start_unix_ns.saturating_add(u128::from(monotonic_us).saturating_mul(1_000)) +} + fn monotonic_us_since(start: tokio::time::Instant) -> u64 { duration_us(tokio::time::Instant::now().saturating_duration_since(start)) } @@ -650,7 +686,10 @@ fn duration_mul(duration: Duration, count: u64) -> Duration { #[cfg(test)] mod tests { - use super::{ProbeConfig, render_audio_chunk}; + use super::{ + OutputDelayProbeTimeline, ProbeConfig, duration_us, render_audio_chunk, unix_ns_from_start, + }; + use crate::camera::{CameraCodec, CameraConfig, CameraOutput}; use lesavka_common::lesavka::OutputDelayProbeRequest; use std::time::Duration; @@ -695,4 +734,62 @@ mod tests { assert!(active.iter().any(|byte| *byte != 0)); assert!(idle.iter().all(|byte| *byte == 0)); } + + #[test] + fn timeline_exports_wall_clock_fields_for_freshness() { + let config = ProbeConfig::from_request(&OutputDelayProbeRequest { + duration_seconds: 6, + warmup_seconds: 1, + pulse_period_ms: 1_000, + pulse_width_ms: 120, + event_width_codes: "1".to_string(), + audio_delay_us: 0, + video_delay_us: 0, + }) + .expect("config"); + let camera = CameraConfig { + output: CameraOutput::Uvc, + codec: CameraCodec::Mjpeg, + width: 640, + height: 480, + fps: 20, + hdmi: None, + }; + let start_unix_ns = 1_700_000_000_000_000_000u128; + let mut timeline = OutputDelayProbeTimeline::new(&config, &camera, start_unix_ns); + let slot = config.event_slot_by_id(0); + let video_us = duration_us(Duration::from_micros(slot.planned_start_us)); + let audio_us = video_us.saturating_add(500); + + timeline.mark_video( + slot, + 1, + video_us, + unix_ns_from_start(start_unix_ns, video_us), + ); + timeline.mark_audio( + slot, + 1, + audio_us, + unix_ns_from_start(start_unix_ns, audio_us), + ); + + let json = serde_json::to_value(timeline).expect("timeline json"); + assert_eq!( + json["server_start_unix_ns"].as_u64(), + Some(start_unix_ns as u64) + ); + assert_eq!( + json["events"][0]["video_feed_unix_ns"].as_u64(), + Some(unix_ns_from_start(start_unix_ns, video_us) as u64) + ); + assert_eq!( + json["events"][0]["audio_push_unix_ns"].as_u64(), + Some(unix_ns_from_start(start_unix_ns, audio_us) as u64) + ); + assert_eq!( + json["events"][0]["server_feed_delta_ms"].as_f64(), + Some(0.5) + ); + } } diff --git a/testing/tests/client_manual_sync_script_contract.rs b/testing/tests/client_manual_sync_script_contract.rs index 53506aa..1b8f769 100644 --- a/testing/tests/client_manual_sync_script_contract.rs +++ b/testing/tests/client_manual_sync_script_contract.rs @@ -37,6 +37,7 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "LOCAL_OUTPUT_DELAY_CORRELATION_JSON=\"${LOCAL_REPORT_DIR}/output-delay-correlation.json\"", "LOCAL_OUTPUT_DELAY_CORRELATION_CSV=\"${LOCAL_REPORT_DIR}/output-delay-correlation.csv\"", "LOCAL_OUTPUT_DELAY_CORRELATION_TXT=\"${LOCAL_REPORT_DIR}/output-delay-correlation.txt\"", + "LOCAL_CLOCK_ALIGNMENT_JSON=\"${LOCAL_REPORT_DIR}/clock-alignment.json\"", "LOCAL_OUTPUT_DELAY_JSON=\"${LOCAL_REPORT_DIR}/output-delay-calibration.json\"", "LOCAL_OUTPUT_DELAY_ENV=\"${LOCAL_REPORT_DIR}/output-delay-calibration.env\"", "LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1}", @@ -48,16 +49,27 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000}", "LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80}", "LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000}", + "LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS:-1000}", + "LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS:-100}", "LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US:-0}", "LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US:-0}", + "write_clock_alignment", + "capture_start_unix_ns=", "write_output_delay_calibration", "extract_server_timeline", "write_output_delay_correlation", "maybe_apply_output_delay_calibration", "schema\": \"lesavka.output-delay-calibration.v1\"", "schema\": \"lesavka.output-delay-correlation.v1\"", + "schema\": \"lesavka.clock-alignment.v1\"", + "schema\": \"lesavka.output-freshness-summary.v1\"", "server_timeline_json=", + "clock_alignment_json: ${LOCAL_CLOCK_ALIGNMENT_JSON}", "dominant_layer", + "freshness status", + "clock-corrected server feed to Tethys capture event", + "video_freshness_ms", + "audio_freshness_ms", "video_delay_function_candidate", "source\": \"direct-uvc-uac-output-probe\"", "scope\": \"server-output-static-baseline\"",