media: pace uvc output and add rct artifact probe
This commit is contained in:
parent
3318900d96
commit
955cfb3b7b
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_client"
|
||||
version = "0.22.32"
|
||||
version = "0.22.33"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
@ -1686,7 +1686,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_common"
|
||||
version = "0.22.32"
|
||||
version = "0.22.33"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
@ -1698,7 +1698,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_server"
|
||||
version = "0.22.32"
|
||||
version = "0.22.33"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
|
||||
@ -468,6 +468,10 @@ path = "tests/manual/client/sync_probe/client_server_rc_matrix_script_contract.r
|
||||
name = "server_rct_mode_matrix_manual_contract"
|
||||
path = "tests/manual/server/rct/server_rct_mode_matrix_manual_contract.rs"
|
||||
|
||||
[[test]]
|
||||
name = "rct_uvc_artifact_probe_manual_contract"
|
||||
path = "tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs"
|
||||
|
||||
[[test]]
|
||||
name = "google_meet_observer_manual_contract"
|
||||
path = "tests/manual/google_meet/google_meet_observer_manual_contract.rs"
|
||||
|
||||
@ -4,7 +4,7 @@ path = "src/main.rs"
|
||||
|
||||
[package]
|
||||
name = "lesavka_client"
|
||||
version = "0.22.32"
|
||||
version = "0.22.33"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lesavka_common"
|
||||
version = "0.22.32"
|
||||
version = "0.22.33"
|
||||
edition = "2024"
|
||||
build = "build.rs"
|
||||
|
||||
|
||||
@ -355,6 +355,7 @@ LESAVKA_UVC_HEIGHT=$(uvc_env_value LESAVKA_UVC_HEIGHT 720)
|
||||
LESAVKA_UVC_CODEC=${INSTALL_UVC_CODEC}
|
||||
LESAVKA_UVC_BLOCKING=$(uvc_env_value LESAVKA_UVC_BLOCKING 1)
|
||||
LESAVKA_UVC_CONTROL_READ_ONLY=$(uvc_env_value LESAVKA_UVC_CONTROL_READ_ONLY 0)
|
||||
LESAVKA_UVC_QUEUE_PACING=$(uvc_env_value LESAVKA_UVC_QUEUE_PACING 1)
|
||||
LESAVKA_UVC_MAXBURST=$(uvc_env_value LESAVKA_UVC_MAXBURST 0)
|
||||
LESAVKA_UVC_BULK=$(uvc_env_value LESAVKA_UVC_BULK 1)
|
||||
LESAVKA_UVC_FRAME_SIZE_GUARD=$(uvc_env_value LESAVKA_UVC_FRAME_SIZE_GUARD 1)
|
||||
|
||||
474
scripts/manual/run_rct_uvc_artifact_probe.py
Executable file
474
scripts/manual/run_rct_uvc_artifact_probe.py
Executable file
@ -0,0 +1,474 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Capture the RCT-facing UVC webcam and score lower-half video corruption."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import collections
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
DEFAULT_DEVICE_LABEL = "Lesavka Composite"
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Long-running Lesavka RCT UVC artifact probe. It captures decoded "
|
||||
"grayscale frames from the host-side UVC device, flags lower-half "
|
||||
"slabs/tears, and writes JSON/PGM artifacts for review."
|
||||
)
|
||||
)
|
||||
parser.add_argument("--host", default="", help="optional SSH host, e.g. tethys")
|
||||
parser.add_argument("--source", choices=["device", "x11"], default="device")
|
||||
parser.add_argument("--device", default="auto", help="video device or auto")
|
||||
parser.add_argument("--display", default=":0", help="X11 display for --source x11")
|
||||
parser.add_argument("--crop", default="", help="X11 crop as x,y,width,height for --source x11")
|
||||
parser.add_argument("--device-label", default=DEFAULT_DEVICE_LABEL)
|
||||
parser.add_argument("--width", type=int, default=1280)
|
||||
parser.add_argument("--height", type=int, default=720)
|
||||
parser.add_argument("--fps", type=int, default=30)
|
||||
parser.add_argument("--duration", type=float, default=180.0)
|
||||
parser.add_argument("--artifact-dir", default="")
|
||||
parser.add_argument("--remote-artifact-dir", default="")
|
||||
parser.add_argument("--x-step", type=int, default=8)
|
||||
parser.add_argument("--y-step", type=int, default=4)
|
||||
parser.add_argument("--bands", type=int, default=24)
|
||||
parser.add_argument("--flat-var", type=float, default=18.0)
|
||||
parser.add_argument("--delta-threshold", type=float, default=24.0)
|
||||
parser.add_argument("--jump-threshold", type=float, default=34.0)
|
||||
parser.add_argument("--max-suspicious-artifacts", type=int, default=40)
|
||||
parser.add_argument("--progress-every", type=int, default=150)
|
||||
parser.add_argument("--self-test", action="store_true")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def timestamp() -> str:
|
||||
return time.strftime("%Y%m%d-%H%M%S", time.gmtime())
|
||||
|
||||
|
||||
def default_artifact_dir() -> pathlib.Path:
|
||||
return pathlib.Path("/tmp") / f"lesavka-rct-uvc-artifact-probe-{timestamp()}"
|
||||
|
||||
|
||||
def run_remote(args: argparse.Namespace) -> int:
|
||||
local_artifact_dir = pathlib.Path(args.artifact_dir or f"artifacts/rct-uvc/{timestamp()}")
|
||||
remote_artifact_dir = args.remote_artifact_dir or f"/tmp/lesavka-rct-uvc-artifact-probe-{timestamp()}"
|
||||
remote_script = f"/tmp/lesavka-rct-uvc-artifact-probe-{os.getpid()}.py"
|
||||
script_text = pathlib.Path(__file__).read_text()
|
||||
subprocess.run(
|
||||
["ssh", args.host, f"cat > {shlex.quote(remote_script)} && chmod +x {shlex.quote(remote_script)}"],
|
||||
input=script_text,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
remote_cmd = [
|
||||
"python3",
|
||||
remote_script,
|
||||
"--source",
|
||||
args.source,
|
||||
"--device",
|
||||
args.device,
|
||||
"--device-label",
|
||||
args.device_label,
|
||||
"--display",
|
||||
args.display,
|
||||
"--crop",
|
||||
args.crop,
|
||||
"--width",
|
||||
str(args.width),
|
||||
"--height",
|
||||
str(args.height),
|
||||
"--fps",
|
||||
str(args.fps),
|
||||
"--duration",
|
||||
str(args.duration),
|
||||
"--artifact-dir",
|
||||
remote_artifact_dir,
|
||||
"--x-step",
|
||||
str(args.x_step),
|
||||
"--y-step",
|
||||
str(args.y_step),
|
||||
"--bands",
|
||||
str(args.bands),
|
||||
"--flat-var",
|
||||
str(args.flat_var),
|
||||
"--delta-threshold",
|
||||
str(args.delta_threshold),
|
||||
"--jump-threshold",
|
||||
str(args.jump_threshold),
|
||||
"--max-suspicious-artifacts",
|
||||
str(args.max_suspicious_artifacts),
|
||||
"--progress-every",
|
||||
str(args.progress_every),
|
||||
]
|
||||
print(f"running remote RCT UVC probe on {args.host}: {remote_artifact_dir}", file=sys.stderr)
|
||||
rc = subprocess.run(["ssh", args.host, " ".join(shlex.quote(part) for part in remote_cmd)]).returncode
|
||||
local_artifact_dir.parent.mkdir(parents=True, exist_ok=True)
|
||||
subprocess.run(
|
||||
["scp", "-r", f"{args.host}:{remote_artifact_dir}", str(local_artifact_dir)],
|
||||
check=False,
|
||||
)
|
||||
print(f"artifact_dir: {local_artifact_dir}")
|
||||
return rc
|
||||
|
||||
|
||||
def detect_video_device(label: str) -> str:
|
||||
explicit = os.environ.get("LESAVKA_RCT_UVC_DEVICE")
|
||||
if explicit:
|
||||
return explicit
|
||||
try:
|
||||
listing = subprocess.check_output(["v4l2-ctl", "--list-devices"], text=True)
|
||||
except Exception:
|
||||
return "/dev/video2"
|
||||
current_matches = False
|
||||
for line in listing.splitlines():
|
||||
if not line.startswith(("\t", " ")):
|
||||
current_matches = label.lower() in line.lower()
|
||||
continue
|
||||
value = line.strip()
|
||||
if current_matches and value.startswith("/dev/video"):
|
||||
return value
|
||||
return "/dev/video2"
|
||||
|
||||
|
||||
def band_stats(frame: bytes, width: int, y0: int, y1: int, x_step: int, y_step: int) -> tuple[float, float]:
|
||||
total = 0
|
||||
total2 = 0
|
||||
count = 0
|
||||
view = memoryview(frame)
|
||||
for y in range(y0, y1, y_step):
|
||||
row = y * width
|
||||
for x in range(0, width, x_step):
|
||||
value = view[row + x]
|
||||
total += value
|
||||
total2 += value * value
|
||||
count += 1
|
||||
if count == 0:
|
||||
return 0.0, 0.0
|
||||
mean = total / count
|
||||
variance = max(0.0, (total2 / count) - (mean * mean))
|
||||
return mean, variance
|
||||
|
||||
|
||||
def band_delta(
|
||||
frame: bytes, previous: bytes | None, width: int, y0: int, y1: int, x_step: int, y_step: int
|
||||
) -> float:
|
||||
if previous is None:
|
||||
return 0.0
|
||||
total = 0
|
||||
count = 0
|
||||
view = memoryview(frame)
|
||||
prev = memoryview(previous)
|
||||
for y in range(y0, y1, y_step):
|
||||
row = y * width
|
||||
for x in range(0, width, x_step):
|
||||
total += abs(view[row + x] - prev[row + x])
|
||||
count += 1
|
||||
return total / count if count else 0.0
|
||||
|
||||
|
||||
def max_run(flags: list[bool]) -> int:
|
||||
best = 0
|
||||
cur = 0
|
||||
for flag in flags:
|
||||
cur = cur + 1 if flag else 0
|
||||
best = max(best, cur)
|
||||
return best
|
||||
|
||||
|
||||
def analyze_frame(frame: bytes, previous: bytes | None, args: argparse.Namespace) -> dict[str, Any]:
|
||||
width = args.width
|
||||
height = args.height
|
||||
band_count = max(8, args.bands)
|
||||
band_h = max(1, height // band_count)
|
||||
means: list[float] = []
|
||||
variances: list[float] = []
|
||||
for band in range(band_count):
|
||||
y0 = band * band_h
|
||||
y1 = height if band == band_count - 1 else min(height, y0 + band_h)
|
||||
mean, variance = band_stats(frame, width, y0, y1, args.x_step, args.y_step)
|
||||
means.append(mean)
|
||||
variances.append(variance)
|
||||
|
||||
half = band_count // 2
|
||||
lower_flags = [var < args.flat_var for var in variances[half:]]
|
||||
lower_flat_pct = sum(lower_flags) / max(1, len(lower_flags))
|
||||
lower_flat_run_pct = max_run(lower_flags) / max(1, len(lower_flags))
|
||||
upper_delta = band_delta(frame, previous, width, 0, height // 2, args.x_step, args.y_step)
|
||||
lower_delta = band_delta(frame, previous, width, height // 2, height, args.x_step, args.y_step)
|
||||
jumps = [abs(means[idx] - means[idx - 1]) for idx in range(1, band_count)]
|
||||
lower_jumps = jumps[half:]
|
||||
max_lower_jump = max(lower_jumps or [0.0])
|
||||
sorted_jumps = sorted(jumps)
|
||||
median_jump = sorted_jumps[len(sorted_jumps) // 2] if sorted_jumps else 0.0
|
||||
|
||||
reasons: list[str] = []
|
||||
temporal_lower_jump = lower_delta > args.delta_threshold and lower_delta > max(upper_delta * 2.2, 8.0)
|
||||
lower_delta_skew = temporal_lower_jump
|
||||
lower_boundary_jump = (
|
||||
temporal_lower_jump
|
||||
and max_lower_jump > args.jump_threshold
|
||||
and max_lower_jump > max(median_jump * 3.0, 8.0)
|
||||
)
|
||||
lower_flat_flash = lower_flat_pct >= 0.25 and temporal_lower_jump
|
||||
lower_slab = lower_flat_run_pct >= 0.33 and max_lower_jump > args.jump_threshold and temporal_lower_jump
|
||||
if lower_delta_skew:
|
||||
reasons.append("lower_delta_skew")
|
||||
if lower_boundary_jump:
|
||||
reasons.append("lower_boundary_jump")
|
||||
if lower_flat_flash:
|
||||
reasons.append("lower_flat_flash")
|
||||
if lower_slab:
|
||||
reasons.append("lower_slab")
|
||||
|
||||
suspicious = bool(lower_delta_skew or lower_boundary_jump or lower_flat_flash or lower_slab)
|
||||
return {
|
||||
"suspicious": suspicious,
|
||||
"reasons": reasons,
|
||||
"upper_delta": round(upper_delta, 3),
|
||||
"lower_delta": round(lower_delta, 3),
|
||||
"lower_flat_pct": round(lower_flat_pct, 3),
|
||||
"lower_flat_run_pct": round(lower_flat_run_pct, 3),
|
||||
"max_lower_jump": round(max_lower_jump, 3),
|
||||
"median_band_jump": round(median_jump, 3),
|
||||
"lower_variance_min": round(min(variances[half:] or [0.0]), 3),
|
||||
"lower_variance_mean": round(sum(variances[half:]) / max(1, len(variances[half:])), 3),
|
||||
}
|
||||
|
||||
|
||||
def write_pgm(path: pathlib.Path, frame: bytes, width: int, height: int) -> None:
|
||||
path.write_bytes(f"P5\n{width} {height}\n255\n".encode() + frame)
|
||||
|
||||
|
||||
def parse_crop(value: str, args: argparse.Namespace) -> tuple[int, int, int, int]:
|
||||
if not value:
|
||||
return (0, 0, args.width, args.height)
|
||||
parts = [part.strip() for part in value.split(",")]
|
||||
if len(parts) != 4:
|
||||
raise SystemExit("--crop must be x,y,width,height")
|
||||
try:
|
||||
x, y, width, height = [int(part) for part in parts]
|
||||
except ValueError as exc:
|
||||
raise SystemExit("--crop values must be integers") from exc
|
||||
if width <= 0 or height <= 0:
|
||||
raise SystemExit("--crop width and height must be positive")
|
||||
args.width = width
|
||||
args.height = height
|
||||
return (x, y, width, height)
|
||||
|
||||
|
||||
def ffmpeg_cmd(device: str, args: argparse.Namespace) -> list[str]:
|
||||
if args.source == "x11":
|
||||
x, y, width, height = parse_crop(args.crop, args)
|
||||
display = f"{args.display}+{x},{y}"
|
||||
return [
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-nostdin",
|
||||
"-loglevel",
|
||||
"warning",
|
||||
"-f",
|
||||
"x11grab",
|
||||
"-video_size",
|
||||
f"{width}x{height}",
|
||||
"-framerate",
|
||||
str(args.fps),
|
||||
"-i",
|
||||
display,
|
||||
"-an",
|
||||
"-pix_fmt",
|
||||
"gray",
|
||||
"-f",
|
||||
"rawvideo",
|
||||
"-",
|
||||
]
|
||||
|
||||
return [
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-nostdin",
|
||||
"-loglevel",
|
||||
"warning",
|
||||
"-f",
|
||||
"v4l2",
|
||||
"-input_format",
|
||||
"mjpeg",
|
||||
"-video_size",
|
||||
f"{args.width}x{args.height}",
|
||||
"-framerate",
|
||||
str(args.fps),
|
||||
"-i",
|
||||
device,
|
||||
"-an",
|
||||
"-pix_fmt",
|
||||
"gray",
|
||||
"-f",
|
||||
"rawvideo",
|
||||
"-",
|
||||
]
|
||||
|
||||
|
||||
def run_capture(args: argparse.Namespace) -> int:
|
||||
artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir()
|
||||
artifact_dir.mkdir(parents=True, exist_ok=True)
|
||||
device = detect_video_device(args.device_label) if args.device == "auto" else args.device
|
||||
command = ffmpeg_cmd(device, args)
|
||||
(artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n")
|
||||
frame_size = args.width * args.height
|
||||
stderr_path = artifact_dir / "ffmpeg.stderr"
|
||||
jsonl_path = artifact_dir / "frame-metrics.jsonl"
|
||||
started = time.monotonic()
|
||||
previous: bytes | None = None
|
||||
frame_index = 0
|
||||
suspicious_count = 0
|
||||
artifacts_written = 0
|
||||
reason_counts: collections.Counter[str] = collections.Counter()
|
||||
worst: list[dict[str, Any]] = []
|
||||
with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics:
|
||||
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err)
|
||||
assert proc.stdout is not None
|
||||
try:
|
||||
while time.monotonic() - started < args.duration:
|
||||
frame = proc.stdout.read(frame_size)
|
||||
if len(frame) != frame_size:
|
||||
break
|
||||
frame_index += 1
|
||||
result = analyze_frame(frame, previous, args)
|
||||
previous = frame
|
||||
result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)})
|
||||
if result["suspicious"]:
|
||||
suspicious_count += 1
|
||||
reason_counts.update(result["reasons"])
|
||||
worst.append(result)
|
||||
worst = sorted(
|
||||
worst,
|
||||
key=lambda item: (item["max_lower_jump"], item["lower_delta"]),
|
||||
reverse=True,
|
||||
)[:20]
|
||||
if artifacts_written < args.max_suspicious_artifacts:
|
||||
write_pgm(
|
||||
artifact_dir / f"suspicious_{frame_index:06d}.pgm",
|
||||
frame,
|
||||
args.width,
|
||||
args.height,
|
||||
)
|
||||
artifacts_written += 1
|
||||
if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0:
|
||||
metrics.write(json.dumps(result, sort_keys=True) + "\n")
|
||||
if frame_index % args.progress_every == 0:
|
||||
print(
|
||||
f"frames={frame_index} suspicious={suspicious_count} latest={result}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
finally:
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.wait(timeout=3)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
elapsed = max(0.001, time.monotonic() - started)
|
||||
summary = {
|
||||
"schema": "lesavka.rct-uvc-artifact-probe.v1",
|
||||
"source": args.source,
|
||||
"device": device,
|
||||
"width": args.width,
|
||||
"height": args.height,
|
||||
"fps_requested": args.fps,
|
||||
"duration_requested_s": args.duration,
|
||||
"duration_observed_s": round(elapsed, 3),
|
||||
"frames": frame_index,
|
||||
"fps_observed": round(frame_index / elapsed, 3),
|
||||
"suspicious_frames": suspicious_count,
|
||||
"suspicious_pct": round((suspicious_count / frame_index * 100.0) if frame_index else 0.0, 3),
|
||||
"reason_counts": dict(reason_counts),
|
||||
"worst_frames": worst,
|
||||
"artifact_dir": str(artifact_dir),
|
||||
"ffmpeg_stderr": str(stderr_path),
|
||||
}
|
||||
(artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n")
|
||||
(artifact_dir / "summary.txt").write_text(format_summary(summary))
|
||||
print(format_summary(summary), end="")
|
||||
print(f"artifact_dir: {artifact_dir}")
|
||||
return 0 if frame_index > 0 else 2
|
||||
|
||||
|
||||
def format_summary(summary: dict[str, Any]) -> str:
|
||||
lines = [
|
||||
"Lesavka RCT UVC artifact probe",
|
||||
f"source: {summary.get('source', 'device')}",
|
||||
f"device: {summary['device']}",
|
||||
f"mode: {summary['width']}x{summary['height']}@{summary['fps_requested']}",
|
||||
f"frames: {summary['frames']} ({summary['fps_observed']} fps observed)",
|
||||
f"suspicious: {summary['suspicious_frames']} ({summary['suspicious_pct']}%)",
|
||||
f"reasons: {summary['reason_counts']}",
|
||||
f"artifacts: {summary['artifact_dir']}",
|
||||
"",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def synthetic_frame(width: int, height: int, shift: int = 0, corrupt: bool = False) -> bytes:
|
||||
data = bytearray(width * height)
|
||||
for y in range(height):
|
||||
row = y * width
|
||||
for x in range(width):
|
||||
data[row + x] = (x // 8 + y // 4 + shift) % 256
|
||||
if corrupt:
|
||||
for y in range(height // 2, height):
|
||||
row = y * width
|
||||
data[row : row + width] = bytes([128]) * width
|
||||
return bytes(data)
|
||||
|
||||
|
||||
def run_self_test(args: argparse.Namespace) -> int:
|
||||
artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir()
|
||||
artifact_dir.mkdir(parents=True, exist_ok=True)
|
||||
args.width = 160
|
||||
args.height = 90
|
||||
frames = [synthetic_frame(args.width, args.height, idx) for idx in range(4)]
|
||||
frames.append(synthetic_frame(args.width, args.height, 5, corrupt=True))
|
||||
previous = None
|
||||
suspicious = 0
|
||||
records = []
|
||||
for idx, frame in enumerate(frames, start=1):
|
||||
result = analyze_frame(frame, previous, args)
|
||||
previous = frame
|
||||
result["frame"] = idx
|
||||
records.append(result)
|
||||
if result["suspicious"]:
|
||||
suspicious += 1
|
||||
write_pgm(artifact_dir / f"selftest_suspicious_{idx:06d}.pgm", frame, args.width, args.height)
|
||||
summary = {
|
||||
"schema": "lesavka.rct-uvc-artifact-probe.self-test.v1",
|
||||
"frames": len(frames),
|
||||
"suspicious_frames": suspicious,
|
||||
"records": records,
|
||||
"artifact_dir": str(artifact_dir),
|
||||
}
|
||||
(artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n")
|
||||
print(json.dumps(summary, indent=2, sort_keys=True))
|
||||
return 0 if suspicious >= 1 else 1
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
if args.self_test:
|
||||
return run_self_test(args)
|
||||
if args.host and args.host not in {"localhost", "127.0.0.1"}:
|
||||
if not shutil.which("ssh") or not shutil.which("scp"):
|
||||
print("ssh and scp are required for --host", file=sys.stderr)
|
||||
return 2
|
||||
return run_remote(args)
|
||||
return run_capture(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@ -10,7 +10,7 @@ bench = false
|
||||
|
||||
[package]
|
||||
name = "lesavka_server"
|
||||
version = "0.22.32"
|
||||
version = "0.22.33"
|
||||
edition = "2024"
|
||||
autobins = false
|
||||
|
||||
|
||||
@ -244,6 +244,8 @@ struct UvcVideoStream {
|
||||
frame_path: std::path::PathBuf,
|
||||
latest_frame: Vec<u8>,
|
||||
frame_max_bytes: usize,
|
||||
frame_period: Option<Duration>,
|
||||
next_queue_at: Option<Instant>,
|
||||
streaming: bool,
|
||||
stats: UvcVideoStats,
|
||||
}
|
||||
@ -257,6 +259,8 @@ struct UvcVideoStats {
|
||||
rejected_invalid: u64,
|
||||
fallback_idle: u64,
|
||||
latest_bytes: usize,
|
||||
paced_sleeps: u64,
|
||||
paced_sleep_ms: u64,
|
||||
last_report: Option<Instant>,
|
||||
}
|
||||
|
||||
@ -268,6 +272,8 @@ impl UvcVideoStream {
|
||||
frame_path: frame_spool_path(),
|
||||
latest_frame: IDLE_MJPEG_FRAME.to_vec(),
|
||||
frame_max_bytes: MAX_MJPEG_FRAME_BYTES,
|
||||
frame_period: None,
|
||||
next_queue_at: None,
|
||||
streaming: false,
|
||||
stats: UvcVideoStats::default(),
|
||||
}
|
||||
@ -276,6 +282,8 @@ impl UvcVideoStream {
|
||||
fn start(&mut self, cfg: UvcConfig) -> Result<()> {
|
||||
self.stop();
|
||||
self.frame_max_bytes = uvc_frame_max_bytes(cfg);
|
||||
self.frame_period = uvc_queue_period(cfg.fps);
|
||||
self.next_queue_at = None;
|
||||
self.set_format(cfg)?;
|
||||
self.request_buffers(uvc_buffer_count())?;
|
||||
for index in 0..self.buffers.len() {
|
||||
@ -417,6 +425,9 @@ impl UvcVideoStream {
|
||||
}
|
||||
|
||||
fn queue_buffer(&mut self, index: u32) -> Result<()> {
|
||||
if self.streaming {
|
||||
self.pace_queue_if_needed();
|
||||
}
|
||||
self.refresh_latest_frame();
|
||||
let Some(buffer) = self.buffers.get(index as usize) else {
|
||||
return Ok(());
|
||||
@ -453,6 +464,23 @@ impl UvcVideoStream {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pace_queue_if_needed(&mut self) {
|
||||
let Some(period) = self.frame_period else {
|
||||
return;
|
||||
};
|
||||
let now = Instant::now();
|
||||
if let Some(deadline) = self.next_queue_at {
|
||||
if let Some(delay) = deadline.checked_duration_since(now) {
|
||||
if !delay.is_zero() {
|
||||
thread::sleep(delay);
|
||||
self.stats.paced_sleeps += 1;
|
||||
self.stats.paced_sleep_ms += delay.as_millis().min(u128::from(u64::MAX)) as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.next_queue_at = Some(Instant::now() + period);
|
||||
}
|
||||
|
||||
fn refresh_latest_frame(&mut self) {
|
||||
let stale = frame_spool_is_stale(&self.frame_path, frame_spool_max_age());
|
||||
if stale && looks_like_mjpeg_frame(&self.latest_frame) {
|
||||
@ -512,7 +540,7 @@ impl UvcVideoStream {
|
||||
}
|
||||
self.stats.last_report = Some(now);
|
||||
eprintln!(
|
||||
"[lesavka-uvc] video stats queued={} reloaded={} stale_replay={} rejected_oversize={} rejected_invalid={} fallback_idle={} latest_bytes={} frame_cap={}",
|
||||
"[lesavka-uvc] video stats queued={} reloaded={} stale_replay={} rejected_oversize={} rejected_invalid={} fallback_idle={} latest_bytes={} frame_cap={} paced_sleeps={} paced_sleep_ms={}",
|
||||
self.stats.queued,
|
||||
self.stats.reloaded,
|
||||
self.stats.replayed_stale,
|
||||
@ -520,7 +548,9 @@ impl UvcVideoStream {
|
||||
self.stats.rejected_invalid,
|
||||
self.stats.fallback_idle,
|
||||
self.stats.latest_bytes,
|
||||
self.frame_payload_limit()
|
||||
self.frame_payload_limit(),
|
||||
self.stats.paced_sleeps,
|
||||
self.stats.paced_sleep_ms
|
||||
);
|
||||
if let Some(path) = uvc_stats_path() {
|
||||
let _ = write_atomic_text(
|
||||
@ -603,6 +633,14 @@ fn uvc_idle_pump_sleep() -> Duration {
|
||||
))
|
||||
}
|
||||
|
||||
fn uvc_queue_period(fps: u32) -> Option<Duration> {
|
||||
if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", true) {
|
||||
return None;
|
||||
}
|
||||
let fps = fps.max(1);
|
||||
Some(Duration::from_nanos(1_000_000_000 / u64::from(fps)))
|
||||
}
|
||||
|
||||
/// Bound MJPEG frames before they enter the USB UVC helper.
|
||||
///
|
||||
/// Inputs: UVC mode plus optional `LESAVKA_UVC_FRAME_MAX_BYTES` or
|
||||
@ -700,7 +738,7 @@ fn write_atomic_text(path: &std::path::Path, text: &str) -> Result<()> {
|
||||
|
||||
fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String {
|
||||
format!(
|
||||
"{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{}}}\n",
|
||||
"{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{},\"paced_sleeps\":{},\"paced_sleep_ms\":{}}}\n",
|
||||
stats.queued,
|
||||
stats.reloaded,
|
||||
stats.replayed_stale,
|
||||
@ -708,7 +746,9 @@ fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String {
|
||||
stats.rejected_invalid,
|
||||
stats.fallback_idle,
|
||||
stats.latest_bytes,
|
||||
frame_cap
|
||||
frame_cap,
|
||||
stats.paced_sleeps,
|
||||
stats.paced_sleep_ms
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -162,6 +162,8 @@ struct UvcVideoStream {
|
||||
frame_path: std::path::PathBuf,
|
||||
latest_frame: Vec<u8>,
|
||||
frame_max_bytes: usize,
|
||||
frame_period: Option<std::time::Duration>,
|
||||
next_queue_at: Option<std::time::Instant>,
|
||||
}
|
||||
|
||||
#[cfg(coverage)]
|
||||
@ -174,6 +176,8 @@ struct UvcVideoStats {
|
||||
rejected_invalid: u64,
|
||||
fallback_idle: u64,
|
||||
latest_bytes: usize,
|
||||
paced_sleeps: u64,
|
||||
paced_sleep_ms: u64,
|
||||
}
|
||||
|
||||
#[cfg(coverage)]
|
||||
@ -184,6 +188,8 @@ impl UvcVideoStream {
|
||||
frame_path: frame_spool_path(),
|
||||
latest_frame: IDLE_MJPEG_FRAME.to_vec(),
|
||||
frame_max_bytes: MAX_MJPEG_FRAME_BYTES,
|
||||
frame_period: None,
|
||||
next_queue_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -151,6 +151,20 @@ fn uvc_idle_pump_sleep() -> std::time::Duration {
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(coverage)]
|
||||
/// Returns the frame queue pacing period for the negotiated UVC frame rate.
|
||||
///
|
||||
/// Inputs: `LESAVKA_UVC_QUEUE_PACING` plus the active frame rate. Output:
|
||||
/// `None` when pacing is explicitly disabled. Why: the RCT-facing host must
|
||||
/// not be overfed faster than the advertised descriptor cadence.
|
||||
fn uvc_queue_period(fps: u32) -> Option<std::time::Duration> {
|
||||
if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", true) {
|
||||
return None;
|
||||
}
|
||||
let fps = fps.max(1);
|
||||
Some(std::time::Duration::from_nanos(1_000_000_000 / u64::from(fps)))
|
||||
}
|
||||
|
||||
#[cfg(coverage)]
|
||||
/// Bound MJPEG frames before they enter the USB UVC helper.
|
||||
///
|
||||
@ -268,7 +282,7 @@ fn write_atomic_text(path: &std::path::Path, text: &str) -> Result<()> {
|
||||
#[cfg(coverage)]
|
||||
fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String {
|
||||
format!(
|
||||
"{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{}}}\n",
|
||||
"{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{},\"paced_sleeps\":{},\"paced_sleep_ms\":{}}}\n",
|
||||
stats.queued,
|
||||
stats.reloaded,
|
||||
stats.replayed_stale,
|
||||
@ -276,7 +290,9 @@ fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String {
|
||||
stats.rejected_invalid,
|
||||
stats.fallback_idle,
|
||||
stats.latest_bytes,
|
||||
frame_cap
|
||||
frame_cap,
|
||||
stats.paced_sleeps,
|
||||
stats.paced_sleep_ms
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -360,12 +360,16 @@ mod uvc_binary {
|
||||
rejected_invalid: 3,
|
||||
fallback_idle: 4,
|
||||
latest_bytes: 77_036,
|
||||
paced_sleeps: 5,
|
||||
paced_sleep_ms: 123,
|
||||
last_report: None,
|
||||
};
|
||||
let json = uvc_stats_snapshot_json(&stats, 333_333);
|
||||
assert!(json.contains("\"queued\":7"));
|
||||
assert!(json.contains("\"latest_bytes\":77036"));
|
||||
assert!(json.contains("\"frame_cap\":333333"));
|
||||
assert!(json.contains("\"paced_sleeps\":5"));
|
||||
assert!(json.contains("\"paced_sleep_ms\":123"));
|
||||
|
||||
let dir = tempfile::tempdir().expect("tempdir");
|
||||
let path = dir.path().join("uvc").join("stats.json");
|
||||
|
||||
@ -250,7 +250,7 @@ mod uvc_binary_extra {
|
||||
assert_eq!(out[2], 1);
|
||||
assert_eq!(out[3], 1);
|
||||
assert_eq!(read_le32(&out, 4), 333_333);
|
||||
assert_eq!(read_le32(&out, 18), 1920 * 1080 * 2);
|
||||
assert_eq!(read_le32(&out, 18), state.cfg.frame_size);
|
||||
assert_eq!(read_le32(&out, 22), state.cfg.max_packet);
|
||||
}
|
||||
|
||||
@ -360,6 +360,10 @@ mod uvc_binary_extra {
|
||||
with_var("LESAVKA_UVC_FRAME_MAX_AGE_MS", None::<&str>, || {
|
||||
assert_eq!(uvc_buffer_count(), 2);
|
||||
assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(2));
|
||||
assert_eq!(
|
||||
uvc_queue_period(30),
|
||||
Some(std::time::Duration::from_nanos(33_333_333))
|
||||
);
|
||||
assert_eq!(
|
||||
frame_spool_max_age(),
|
||||
Some(std::time::Duration::from_millis(1_000))
|
||||
@ -375,9 +379,12 @@ mod uvc_binary_extra {
|
||||
with_var("LESAVKA_UVC_BUFFER_COUNT", Some("99"), || {
|
||||
with_var("LESAVKA_UVC_IDLE_PUMP_MS", Some("11"), || {
|
||||
with_var("LESAVKA_UVC_FRAME_MAX_AGE_MS", Some("0"), || {
|
||||
assert_eq!(uvc_buffer_count(), 8);
|
||||
assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(11));
|
||||
assert_eq!(frame_spool_max_age(), None);
|
||||
with_var("LESAVKA_UVC_QUEUE_PACING", Some("0"), || {
|
||||
assert_eq!(uvc_buffer_count(), 8);
|
||||
assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(11));
|
||||
assert_eq!(uvc_queue_period(30), None);
|
||||
assert_eq!(frame_spool_max_age(), None);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -0,0 +1,91 @@
|
||||
// Contract coverage for the RCT-side UVC artifact probe.
|
||||
//
|
||||
// Scope: the manual long-running UVC corruption detector used against tethys.
|
||||
// Targets: `scripts/manual/run_rct_uvc_artifact_probe.py`.
|
||||
// Why: lower-half UVC tears are late-path artifacts, so we need a durable
|
||||
// host-side probe that can catch them from the same device Google Meet reads.
|
||||
|
||||
use std::{fs, path::PathBuf, process::Command};
|
||||
|
||||
use serde_json::Value;
|
||||
|
||||
const PROBE_SRC: &str = include_str!(concat!(
|
||||
env!("CARGO_MANIFEST_DIR"),
|
||||
"/scripts/manual/run_rct_uvc_artifact_probe.py"
|
||||
));
|
||||
|
||||
fn repo_script_path() -> PathBuf {
|
||||
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("scripts/manual/run_rct_uvc_artifact_probe.py")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rct_uvc_artifact_probe_documents_late_path_lower_half_detection() {
|
||||
for expected in [
|
||||
"lesavka.rct-uvc-artifact-probe.v1",
|
||||
"Lesavka Composite",
|
||||
"lower_delta_skew",
|
||||
"lower_boundary_jump",
|
||||
"lower_flat_flash",
|
||||
"lower_slab",
|
||||
"ffmpeg",
|
||||
"v4l2",
|
||||
"x11grab",
|
||||
"--source",
|
||||
"--crop",
|
||||
"PGM",
|
||||
"--host",
|
||||
] {
|
||||
assert!(
|
||||
PROBE_SRC.contains(expected),
|
||||
"RCT UVC artifact probe should preserve marker {expected}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rct_uvc_artifact_probe_self_test_flags_synthetic_lower_half_slab() {
|
||||
let dir = tempfile::tempdir().expect("tempdir");
|
||||
let output = Command::new("python3")
|
||||
.arg(repo_script_path())
|
||||
.arg("--self-test")
|
||||
.arg("--artifact-dir")
|
||||
.arg(dir.path())
|
||||
.output()
|
||||
.expect("run rct uvc artifact probe self-test");
|
||||
|
||||
assert!(
|
||||
output.status.success(),
|
||||
"probe self-test should succeed: stdout={} stderr={}",
|
||||
String::from_utf8_lossy(&output.stdout),
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
);
|
||||
|
||||
let summary: Value = serde_json::from_str(
|
||||
&fs::read_to_string(dir.path().join("summary.json")).expect("summary json"),
|
||||
)
|
||||
.expect("parse summary json");
|
||||
assert_eq!(
|
||||
summary["schema"],
|
||||
"lesavka.rct-uvc-artifact-probe.self-test.v1"
|
||||
);
|
||||
assert_eq!(summary["frames"], 5);
|
||||
assert!(
|
||||
summary["suspicious_frames"].as_u64().unwrap_or_default() >= 1,
|
||||
"self-test should detect the synthetic lower-half slab: {summary}"
|
||||
);
|
||||
let records = summary["records"].as_array().expect("records array");
|
||||
let corrupt = records
|
||||
.iter()
|
||||
.find(|record| record["suspicious"].as_bool().unwrap_or(false))
|
||||
.expect("suspicious record");
|
||||
let reasons = corrupt["reasons"].as_array().expect("reasons array");
|
||||
assert!(
|
||||
reasons.iter().any(|reason| reason == "lower_slab")
|
||||
|| reasons.iter().any(|reason| reason == "lower_flat_flash"),
|
||||
"self-test should classify the lower-half slab/flash: {corrupt}"
|
||||
);
|
||||
assert!(
|
||||
dir.path().join("selftest_suspicious_000005.pgm").exists(),
|
||||
"probe should save visual evidence for suspicious frames"
|
||||
);
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user