2026-05-16 06:48:01 -03:00
#!/usr/bin/env python3
""" Run synthetic Lesavka uplink media and compare what the RCT receives. """
from __future__ import annotations
import argparse
import collections
import json
import os
import pathlib
import shlex
import shutil
import subprocess
import sys
import time
from typing import Any
DEFAULT_DEVICE_LABEL = " Lesavka Composite "
DEFAULT_MODES = " 1280x720@20,1280x720@30,1920x1080@20,1920x1080@30 "
2026-05-17 00:42:08 -03:00
DEFAULT_JPEG_QUALITY = 82
HIGH_SPEED_ISOCHRONOUS_MICROFRAMES_PER_SEC = 8000
DEFAULT_ISOCHRONOUS_LIMIT_PCT = 85
DEFAULT_UVC_MAX_PACKET = 1024
2026-05-16 06:48:01 -03:00
MARKER_BITS = 32
MARKER_COLUMNS = 16
def parse_args ( ) - > argparse . Namespace :
parser = argparse . ArgumentParser (
description = (
" Manual synthetic end-to-end probe: Theia sends sequence-coded media "
" through StreamWebcamMedia while Tethys captures the received UVC/X11 "
" frames and compares them to the generated source. "
)
)
parser . add_argument ( " --inject-host " , default = " " , help = " Theia SSH host, e.g. titan-jh " )
2026-05-17 01:53:40 -03:00
parser . add_argument ( " --local-inject " , action = " store_true " , help = " run the synthetic injector directly on this host " )
2026-05-16 06:48:01 -03:00
parser . add_argument ( " --rct-host " , default = " " , help = " RCT SSH host, e.g. tethys " )
2026-05-16 22:03:12 -03:00
parser . add_argument ( " --server " , default = " https://127.0.0.1:50051 " )
2026-05-16 06:48:01 -03:00
parser . add_argument ( " --inject-binary " , default = " /usr/local/bin/lesavka-synthetic-uplink " )
parser . add_argument ( " --mode " , default = " 1280x720@30 " , help = f " one mode; baseline set is { DEFAULT_MODES } " )
parser . add_argument ( " --width " , type = int , default = 0 , help = " override capture width " )
parser . add_argument ( " --height " , type = int , default = 0 , help = " override capture height " )
parser . add_argument ( " --fps " , type = int , default = 0 , help = " override capture fps " )
parser . add_argument ( " --duration " , type = float , default = 300.0 )
parser . add_argument ( " --source " , choices = [ " device " , " x11 " ] , default = " device " )
parser . add_argument ( " --device " , default = " auto " )
parser . add_argument ( " --device-label " , default = DEFAULT_DEVICE_LABEL )
parser . add_argument ( " --display " , default = " :0 " )
parser . add_argument ( " --crop " , default = " " , help = " x,y,width,height for --source x11 " )
parser . add_argument ( " --artifact-dir " , default = " " )
parser . add_argument ( " --remote-rct-dir " , default = " " )
parser . add_argument ( " --remote-inject-dir " , default = " " )
2026-05-16 20:58:24 -03:00
parser . add_argument (
" --capture-before-inject " ,
action = " store_true " ,
help = " start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast " ,
)
parser . add_argument ( " --inject-warmup-s " , type = float , default = 1.25 )
2026-05-17 10:09:43 -03:00
parser . add_argument (
" --capture-finish-grace-s " ,
type = float ,
default = 0.0 ,
help = " seconds to wait for capture after injector exits; 0 waits indefinitely " ,
)
2026-05-17 00:42:08 -03:00
parser . add_argument ( " --jpeg-quality " , type = int , default = DEFAULT_JPEG_QUALITY )
parser . add_argument (
" --inject-max-frame-bytes " ,
type = int ,
default = 0 ,
help = " max encoded synthetic MJPEG bytes; default uses the safe high-speed isochronous budget for the selected fps " ,
)
2026-05-16 06:48:01 -03:00
parser . add_argument ( " --x-step " , type = int , default = 8 )
parser . add_argument ( " --y-step " , type = int , default = 4 )
parser . add_argument ( " --bands " , type = int , default = 24 )
parser . add_argument ( " --mae-threshold " , type = float , default = 18.0 )
parser . add_argument ( " --lower-mae-threshold " , type = float , default = 28.0 )
parser . add_argument ( " --lower-skew-ratio " , type = float , default = 1.8 )
parser . add_argument ( " --slab-var " , type = float , default = 20.0 )
parser . add_argument ( " --shift-threshold " , type = float , default = 16.0 )
parser . add_argument ( " --shift-improvement " , type = float , default = 1.25 )
parser . add_argument ( " --max-suspicious-artifacts " , type = int , default = 80 )
parser . add_argument ( " --max-reference-artifacts " , type = int , default = 12 )
parser . add_argument ( " --reference-every " , type = int , default = 900 )
parser . add_argument ( " --progress-every " , type = int , default = 150 )
2026-05-17 10:09:43 -03:00
parser . add_argument (
" --stream-analyze " ,
action = " store_true " ,
help = " debug path: analyze ffmpeg stdout directly instead of spooling raw frames first " ,
)
2026-05-16 06:48:01 -03:00
parser . add_argument ( " --capture-only " , action = " store_true " , help = argparse . SUPPRESS )
parser . add_argument ( " --self-test " , action = " store_true " )
return parser . parse_args ( )
def timestamp ( ) - > str :
return time . strftime ( " % Y % m %d - % H % M % S " , time . gmtime ( ) )
def parse_mode ( value : str ) - > tuple [ int , int , int ] :
try :
size , fps = value . lower ( ) . split ( " @ " , 1 )
width , height = size . split ( " x " , 1 )
return int ( width ) , int ( height ) , int ( fps )
except ValueError as exc :
raise SystemExit ( f " --mode must look like WIDTHxHEIGHT@FPS, got { value !r} " ) from exc
def mode_dimensions ( args : argparse . Namespace ) - > tuple [ int , int , int ] :
width , height , fps = parse_mode ( args . mode )
if args . width :
width = args . width
if args . height :
height = args . height
if args . fps :
fps = args . fps
if width < = 0 or height < = 0 or fps < = 0 :
raise SystemExit ( " width, height, and fps must be positive " )
return width , height , fps
2026-05-17 00:42:08 -03:00
def default_inject_max_frame_bytes ( fps : int ) - > int :
bytes_per_second = (
DEFAULT_UVC_MAX_PACKET
* HIGH_SPEED_ISOCHRONOUS_MICROFRAMES_PER_SEC
* DEFAULT_ISOCHRONOUS_LIMIT_PCT
/ / 100
)
return max ( 64 * 1024 , bytes_per_second / / max ( 1 , fps ) )
2026-05-16 06:48:01 -03:00
def default_artifact_dir ( mode : str ) - > pathlib . Path :
safe_mode = mode . replace ( " @ " , " - " ) . replace ( " x " , " x " )
return pathlib . Path ( " artifacts/synthetic-rct " ) / f " { safe_mode } - { timestamp ( ) } "
def run_remote_orchestrated ( args : argparse . Namespace ) - > int :
2026-05-17 01:53:40 -03:00
if ( not args . inject_host and not args . local_inject ) or not args . rct_host :
raise SystemExit (
" --rct-host and either --inject-host or --local-inject are required unless --capture-only or --self-test is used "
)
2026-05-16 06:48:01 -03:00
if not shutil . which ( " ssh " ) or not shutil . which ( " scp " ) :
raise SystemExit ( " ssh and scp are required for the remote synthetic probe " )
width , height , fps = mode_dimensions ( args )
2026-05-17 00:42:08 -03:00
inject_max_frame_bytes = args . inject_max_frame_bytes or default_inject_max_frame_bytes ( fps )
2026-05-16 06:48:01 -03:00
artifact_dir = pathlib . Path ( args . artifact_dir ) if args . artifact_dir else default_artifact_dir ( args . mode )
artifact_dir . mkdir ( parents = True , exist_ok = True )
remote_rct_dir = args . remote_rct_dir or f " /tmp/lesavka-synthetic-rct-capture- { timestamp ( ) } "
remote_inject_dir = args . remote_inject_dir or f " /tmp/lesavka-synthetic-uplink- { timestamp ( ) } "
remote_script = f " /tmp/lesavka-synthetic-rct-probe- { os . getpid ( ) } .py "
script_text = pathlib . Path ( __file__ ) . read_text ( )
subprocess . run (
[ " ssh " , args . rct_host , f " cat > { shlex . quote ( remote_script ) } && chmod +x { shlex . quote ( remote_script ) } " ] ,
input = script_text ,
text = True ,
check = True ,
)
capture_cmd = [
" python3 " ,
remote_script ,
" --capture-only " ,
" --mode " ,
args . mode ,
" --width " ,
str ( width ) ,
" --height " ,
str ( height ) ,
" --fps " ,
str ( fps ) ,
" --duration " ,
str ( args . duration ) ,
" --source " ,
args . source ,
" --device " ,
args . device ,
" --device-label " ,
args . device_label ,
" --display " ,
args . display ,
" --crop " ,
args . crop ,
" --artifact-dir " ,
remote_rct_dir ,
" --x-step " ,
str ( args . x_step ) ,
" --y-step " ,
str ( args . y_step ) ,
" --bands " ,
str ( args . bands ) ,
" --mae-threshold " ,
str ( args . mae_threshold ) ,
" --lower-mae-threshold " ,
str ( args . lower_mae_threshold ) ,
" --lower-skew-ratio " ,
str ( args . lower_skew_ratio ) ,
" --slab-var " ,
str ( args . slab_var ) ,
" --shift-threshold " ,
str ( args . shift_threshold ) ,
" --shift-improvement " ,
str ( args . shift_improvement ) ,
" --max-suspicious-artifacts " ,
str ( args . max_suspicious_artifacts ) ,
" --max-reference-artifacts " ,
str ( args . max_reference_artifacts ) ,
" --reference-every " ,
str ( args . reference_every ) ,
" --progress-every " ,
str ( args . progress_every ) ,
]
2026-05-17 10:09:43 -03:00
if args . stream_analyze :
capture_cmd . append ( " --stream-analyze " )
2026-05-16 06:48:01 -03:00
inject_cmd = [
args . inject_binary ,
" --server " ,
args . server ,
" --mode " ,
args . mode ,
" --duration " ,
str ( args . duration + 2.0 ) ,
" --artifact-dir " ,
remote_inject_dir ,
2026-05-17 00:42:08 -03:00
" --jpeg-quality " ,
str ( args . jpeg_quality ) ,
" --max-frame-bytes " ,
str ( inject_max_frame_bytes ) ,
2026-05-16 06:48:01 -03:00
" --print-every " ,
str ( args . progress_every ) ,
]
( artifact_dir / " orchestrator-command.txt " ) . write_text ( " " . join ( sys . argv ) + " \n " )
( artifact_dir / " mode.json " ) . write_text (
json . dumps (
{
" schema " : " lesavka.synthetic-rct-probe.run.v1 " ,
" mode " : args . mode ,
" width " : width ,
" height " : height ,
" fps " : fps ,
" source " : args . source ,
" duration_s " : args . duration ,
2026-05-17 00:42:08 -03:00
" jpeg_quality " : args . jpeg_quality ,
" inject_max_frame_bytes " : inject_max_frame_bytes ,
2026-05-16 06:48:01 -03:00
" inject_host " : args . inject_host ,
2026-05-17 01:53:40 -03:00
" local_inject " : args . local_inject ,
2026-05-16 06:48:01 -03:00
" rct_host " : args . rct_host ,
} ,
indent = 2 ,
sort_keys = True ,
)
+ " \n "
)
2026-05-16 20:58:24 -03:00
def start_capture ( ) - > subprocess . Popen [ Any ] :
print ( f " starting RCT capture on { args . rct_host } : { remote_rct_dir } " , file = sys . stderr )
return subprocess . Popen ( [ " ssh " , args . rct_host , " " . join ( shlex . quote ( part ) for part in capture_cmd ) ] )
def start_inject ( ) - > subprocess . Popen [ Any ] :
2026-05-17 01:53:40 -03:00
if args . local_inject :
print ( f " starting local synthetic uplink: { remote_inject_dir } " , file = sys . stderr )
return subprocess . Popen ( inject_cmd )
2026-05-16 20:58:24 -03:00
print ( f " starting synthetic uplink on { args . inject_host } : { remote_inject_dir } " , file = sys . stderr )
return subprocess . Popen ( [ " ssh " , args . inject_host , " " . join ( shlex . quote ( part ) for part in inject_cmd ) ] )
def stop_capture ( process : subprocess . Popen [ Any ] ) - > int | None :
process . terminate ( )
try :
return process . wait ( timeout = 5 )
except subprocess . TimeoutExpired :
process . kill ( )
return process . wait ( )
def wait_capture_or_inject_exit (
capture_process : subprocess . Popen [ Any ] , inject_process : subprocess . Popen [ Any ]
) - > tuple [ int | None , int | None ] :
while True :
capture_status = capture_process . poll ( )
if capture_status is not None :
return capture_status , inject_process . wait ( )
inject_status = inject_process . poll ( )
if inject_status is not None :
2026-05-17 00:42:08 -03:00
if inject_status == 0 :
2026-05-17 10:09:43 -03:00
if args . capture_finish_grace_s < = 0 :
return capture_process . wait ( ) , inject_status
deadline = time . monotonic ( ) + args . capture_finish_grace_s
2026-05-17 00:42:08 -03:00
while time . monotonic ( ) < deadline :
capture_status = capture_process . poll ( )
if capture_status is not None :
return capture_status , inject_status
time . sleep ( 0.25 )
diagnosis . append (
" synthetic uplink completed but RCT capture did not finish; capture likely lagged, froze, or was blocked by another consumer "
)
else :
diagnosis . append (
" synthetic uplink exited while RCT capture was still active; stopping capture because the run is not isolated or the injector failed "
)
2026-05-16 20:58:24 -03:00
print (
f " synthetic uplink exited during capture rc= { inject_status } ; stopping RCT capture " ,
file = sys . stderr ,
)
return stop_capture ( capture_process ) , inject_status
time . sleep ( 0.25 )
capture : subprocess . Popen [ Any ] | None = None
diagnosis : list [ str ] = [ ]
if args . capture_before_inject :
capture = start_capture ( )
time . sleep ( 1.0 )
inject = start_inject ( )
capture_rc , inject_rc = wait_capture_or_inject_exit ( capture , inject )
else :
inject = start_inject ( )
time . sleep ( max ( 0.0 , args . inject_warmup_s ) )
inject_rc = inject . poll ( )
if inject_rc is not None :
capture_rc = None
diagnosis . append (
" synthetic uplink exited before capture warmup completed; disconnect the live client or pause upstream webcam before running the isolated probe "
)
print ( f " synthetic uplink exited before capture started rc= { inject_rc } " , file = sys . stderr )
else :
capture = start_capture ( )
capture_rc , inject_rc = wait_capture_or_inject_exit ( capture , inject )
2026-05-16 06:48:01 -03:00
local_capture = artifact_dir / " capture "
local_inject = artifact_dir / " inject "
2026-05-16 20:58:24 -03:00
if capture is not None :
subprocess . run ( [ " scp " , " -r " , f " { args . rct_host } : { remote_rct_dir } " , str ( local_capture ) ] , check = False )
2026-05-17 01:53:40 -03:00
if args . local_inject :
if pathlib . Path ( remote_inject_dir ) . exists ( ) :
if local_inject . exists ( ) :
shutil . rmtree ( local_inject )
shutil . copytree ( remote_inject_dir , local_inject )
else :
subprocess . run ( [ " scp " , " -r " , f " { args . inject_host } : { remote_inject_dir } " , str ( local_inject ) ] , check = False )
2026-05-16 20:58:24 -03:00
capture_summary = local_capture / " summary.json "
if capture_summary . exists ( ) :
try :
capture_data = json . loads ( capture_summary . read_text ( ) )
decoded_pct = float ( capture_data . get ( " decoded_pct " ) or 0.0 )
if inject_rc != 0 and decoded_pct < 80.0 :
diagnosis . append (
" captured frames did not consistently contain synthetic markers and the injector failed; the RCT capture likely measured a mixed, previous, or live webcam stream "
)
2026-05-17 00:42:08 -03:00
fps_observed = float ( capture_data . get ( " fps_observed " ) or 0.0 )
fps_requested = float ( capture_data . get ( " fps_requested " ) or fps )
if fps_observed and fps_observed < fps_requested * 0.5 :
diagnosis . append (
f " RCT capture decoded only { fps_observed : .3f } fps from a { fps_requested : .0f } fps mode; check for a frozen UVC device or another browser/process holding the camera "
)
frames = int ( capture_data . get ( " frames " ) or 0 )
reason_counts = capture_data . get ( " reason_counts " ) or { }
repeats = int ( reason_counts . get ( " frame_repeat " ) or 0 )
if frames > 0 and repeats > = max ( 3 , int ( frames * 0.9 ) ) :
diagnosis . append (
" RCT capture repeated nearly every decoded synthetic marker; the received UVC stream was stale/frozen instead of advancing "
)
except Exception :
pass
inject_summary = local_inject / " summary.json "
if inject_summary . exists ( ) :
try :
inject_data = json . loads ( inject_summary . read_text ( ) )
oversize_frames = int ( inject_data . get ( " encoded_oversize_frames " ) or 0 )
2026-05-17 01:53:40 -03:00
sent_frames = int ( inject_data . get ( " sent_frames " ) or 0 )
encoded_frames = int ( inject_data . get ( " encoded_frames " ) or 0 )
exit_reason = str ( inject_data . get ( " exit_reason " ) or " " )
2026-05-17 00:42:08 -03:00
max_bytes = inject_data . get ( " encoded_max_bytes " )
max_frame_bytes = inject_data . get ( " max_frame_bytes " )
if oversize_frames :
diagnosis . append (
f " synthetic injector produced { oversize_frames } over-budget MJPEG frame(s), max= { max_bytes } cap= { max_frame_bytes } ; the server will freeze instead of spooling those frames "
)
2026-05-17 01:53:40 -03:00
if inject_rc != 0 and " StreamWebcamMedia closed before accepting synthetic frame " in exit_reason :
diagnosis . append (
f " synthetic injector was preempted after sending { sent_frames } frame(s); disconnect/pause the live Lesavka client upstream before running this isolated probe "
)
elif inject_rc != 0 and encoded_frames > 0 and not oversize_frames :
diagnosis . append (
f " synthetic injector encoded { encoded_frames } in-budget frame(s) before failing; inspect inject/summary.json exit_reason for the stream-close cause "
)
2026-05-16 20:58:24 -03:00
except Exception :
pass
2026-05-16 06:48:01 -03:00
summary = {
" schema " : " lesavka.synthetic-rct-probe.orchestrator.v1 " ,
" mode " : args . mode ,
" capture_rc " : capture_rc ,
" inject_rc " : inject_rc ,
2026-05-16 20:58:24 -03:00
" diagnosis " : diagnosis ,
2026-05-16 06:48:01 -03:00
" artifact_dir " : str ( artifact_dir ) ,
" capture_artifacts " : str ( local_capture ) ,
" inject_artifacts " : str ( local_inject ) ,
}
( artifact_dir / " run-summary.json " ) . write_text ( json . dumps ( summary , indent = 2 , sort_keys = True ) + " \n " )
print ( json . dumps ( summary , indent = 2 , sort_keys = True ) )
print ( f " artifact_dir: { artifact_dir } " )
return 0 if capture_rc == 0 and inject_rc == 0 else 1
def detect_video_device ( label : str ) - > str :
explicit = os . environ . get ( " LESAVKA_RCT_UVC_DEVICE " )
if explicit :
return explicit
try :
listing = subprocess . check_output ( [ " v4l2-ctl " , " --list-devices " ] , text = True )
except Exception :
return " /dev/video2 "
current_matches = False
for line in listing . splitlines ( ) :
if not line . startswith ( ( " \t " , " " ) ) :
current_matches = label . lower ( ) in line . lower ( )
continue
value = line . strip ( )
if current_matches and value . startswith ( " /dev/video " ) :
return value
return " /dev/video2 "
def parse_crop ( args : argparse . Namespace , width : int , height : int ) - > tuple [ int , int , int , int ] :
if not args . crop :
return 0 , 0 , width , height
parts = [ part . strip ( ) for part in args . crop . split ( " , " ) ]
if len ( parts ) != 4 :
raise SystemExit ( " --crop must be x,y,width,height " )
x , y , crop_width , crop_height = [ int ( part ) for part in parts ]
if crop_width < = 0 or crop_height < = 0 :
raise SystemExit ( " --crop width and height must be positive " )
return x , y , crop_width , crop_height
def ffmpeg_cmd ( args : argparse . Namespace , width : int , height : int ) - > tuple [ list [ str ] , int , int , str ] :
if args . source == " x11 " :
x , y , capture_width , capture_height = parse_crop ( args , width , height )
display = f " { args . display } + { x } , { y } "
return (
[
" ffmpeg " ,
" -hide_banner " ,
" -nostdin " ,
" -loglevel " ,
" warning " ,
" -f " ,
" x11grab " ,
" -video_size " ,
f " { capture_width } x { capture_height } " ,
" -framerate " ,
str ( args . fps or parse_mode ( args . mode ) [ 2 ] ) ,
" -i " ,
display ,
" -an " ,
" -pix_fmt " ,
" gray " ,
" -f " ,
" rawvideo " ,
" - " ,
] ,
capture_width ,
capture_height ,
display ,
)
device = detect_video_device ( args . device_label ) if args . device == " auto " else args . device
return (
[
" ffmpeg " ,
" -hide_banner " ,
" -nostdin " ,
" -loglevel " ,
" warning " ,
" -f " ,
" v4l2 " ,
" -input_format " ,
" mjpeg " ,
" -video_size " ,
f " { width } x { height } " ,
" -framerate " ,
str ( args . fps or parse_mode ( args . mode ) [ 2 ] ) ,
" -i " ,
device ,
" -an " ,
" -pix_fmt " ,
" gray " ,
" -f " ,
" rawvideo " ,
" - " ,
] ,
width ,
height ,
device ,
)
def marker_cell ( width : int , height : int ) - > int :
return max ( 6 , min ( 16 , min ( width , height ) / / 80 ) )
def fill_rect ( frame : bytearray , width : int , height : int , x0 : int , y0 : int , w : int , h : int , value : int ) - > None :
for y in range ( max ( 0 , y0 ) , min ( height , y0 + h ) ) :
row = y * width
for x in range ( max ( 0 , x0 ) , min ( width , x0 + w ) ) :
frame [ row + x ] = value
def synthetic_gray ( width : int , height : int , sequence : int ) - > bytes :
data = bytearray ( width * height )
2026-05-17 00:42:08 -03:00
safe_width = max ( width , 1 )
safe_height = max ( height , 1 )
moving_width = min ( max ( width / / 10 , 32 ) , safe_width )
moving_offset = ( sequence * 13 ) % safe_width
2026-05-16 06:48:01 -03:00
center_x = width / / 2
center_y = height / / 2
2026-05-17 00:42:08 -03:00
block_w = max ( width / / 24 , 24 )
block_h = max ( height / / 18 , 18 )
2026-05-16 06:48:01 -03:00
for y in range ( height ) :
row = y * width
for x in range ( width ) :
2026-05-17 00:42:08 -03:00
base = 44 + ( x * 72 / / safe_width ) + ( y * 52 / / safe_height ) + ( ( sequence * 3 ) % 28 )
checker = 30 if ( ( ( x / / block_w ) + ( y / / block_h ) + ( sequence / / 5 ) ) & 1 ) == 0 else 0
value = min ( 238 , base + checker )
moving = ( x + safe_width - moving_offset ) % safe_width
if moving < moving_width :
value = min ( 255 , 220 - ( y * 54 / / safe_height ) )
elif moving < moving_width + 4 :
value = 28
2026-05-16 06:48:01 -03:00
if abs ( x - center_x ) < width / / 9 and abs ( y - center_y ) < height / / 12 :
value = 255 - value / / 2
data [ row + x ] = value
draw_marker ( data , width , height , sequence )
return bytes ( data )
def draw_marker ( frame : bytearray , width : int , height : int , sequence : int ) - > None :
cell = marker_cell ( width , height )
rows = ( MARKER_BITS + MARKER_COLUMNS - 1 ) / / MARKER_COLUMNS
if width < ( MARKER_COLUMNS + 4 ) * cell or height < ( rows + 4 ) * cell :
return
x0 = 2 * cell
y0 = 2 * cell
fill_rect ( frame , width , height , cell , cell , ( MARKER_COLUMNS + 2 ) * cell , ( rows + 2 ) * cell , 32 )
fill_rect ( frame , width , height , x0 - cell , y0 - cell , cell , cell , 255 )
fill_rect ( frame , width , height , x0 + MARKER_COLUMNS * cell , y0 - cell , cell , cell , 0 )
for bit in range ( MARKER_BITS ) :
col = bit % MARKER_COLUMNS
row = bit / / MARKER_COLUMNS
value = 255 if ( ( sequence >> bit ) & 1 ) else 0
fill_rect ( frame , width , height , x0 + col * cell , y0 + row * cell , cell , cell , value )
def cell_mean ( frame : bytes , width : int , x0 : int , y0 : int , cell : int ) - > float :
total = 0
count = 0
inset = max ( 1 , cell / / 4 )
for y in range ( y0 + inset , y0 + cell - inset ) :
row = y * width
for x in range ( x0 + inset , x0 + cell - inset ) :
total + = frame [ row + x ]
count + = 1
return total / max ( 1 , count )
def decode_sequence ( frame : bytes , width : int , height : int ) - > tuple [ int | None , int ] :
cell = marker_cell ( width , height )
rows = ( MARKER_BITS + MARKER_COLUMNS - 1 ) / / MARKER_COLUMNS
if width < ( MARKER_COLUMNS + 4 ) * cell or height < ( rows + 4 ) * cell :
return None , MARKER_BITS
x0 = 2 * cell
y0 = 2 * cell
value = 0
uncertain = 0
for bit in range ( MARKER_BITS ) :
col = bit % MARKER_COLUMNS
row = bit / / MARKER_COLUMNS
mean = cell_mean ( frame , width , x0 + col * cell , y0 + row * cell , cell )
if mean > 165 :
value | = 1 << bit
elif mean > = 90 :
uncertain + = 1
if uncertain > 6 :
return None , uncertain
return value , uncertain
def sampled_abs_delta ( a : bytes , b : bytes , width : int , y0 : int , y1 : int , x_step : int , y_step : int ) - > float :
total = 0
count = 0
for y in range ( y0 , y1 , y_step ) :
row = y * width
for x in range ( 0 , width , x_step ) :
total + = abs ( a [ row + x ] - b [ row + x ] )
count + = 1
return total / max ( 1 , count )
def band_stats ( frame : bytes , width : int , y0 : int , y1 : int , x_step : int , y_step : int ) - > tuple [ float , float ] :
total = 0
total2 = 0
count = 0
for y in range ( y0 , y1 , y_step ) :
row = y * width
for x in range ( 0 , width , x_step ) :
value = frame [ row + x ]
total + = value
total2 + = value * value
count + = 1
mean = total / max ( 1 , count )
return mean , max ( 0.0 , total2 / max ( 1 , count ) - mean * mean )
def shifted_expected_delta ( frame : bytes , expected : bytes , width : int , height : int , shift : int , args : argparse . Namespace ) - > float :
x0 = max ( 0 , - shift )
x1 = min ( width , width - shift )
if x0 > = x1 :
return 0.0
y0 = height / / 4
total = 0
count = 0
for y in range ( y0 , height , args . y_step ) :
row = y * width
for x in range ( x0 , x1 , args . x_step ) :
total + = abs ( frame [ row + x ] - expected [ row + x + shift ] )
count + = 1
return total / max ( 1 , count )
def best_expected_shift ( frame : bytes , expected : bytes , width : int , height : int , args : argparse . Namespace ) - > tuple [ int , float , float , float ] :
zero = shifted_expected_delta ( frame , expected , width , height , 0 , args )
best = zero
best_shift = 0
for shift in [ - 128 , - 96 , - 80 , - 64 , - 48 , - 32 , - 24 , - 16 , - 12 , - 8 , 8 , 12 , 16 , 24 , 32 , 48 , 64 , 80 , 96 , 128 ] :
candidate = shifted_expected_delta ( frame , expected , width , height , shift , args )
if candidate < best :
best = candidate
best_shift = shift
improvement = zero / max ( best , 0.001 ) if best_shift else 1.0
return best_shift , zero , best , improvement
def max_run ( flags : list [ bool ] ) - > int :
best = 0
current = 0
for flag in flags :
current = current + 1 if flag else 0
best = max ( best , current )
return best
def analyze_frame (
frame : bytes ,
width : int ,
height : int ,
args : argparse . Namespace ,
previous_seq : int | None ,
) - > dict [ str , Any ] :
sequence , uncertain_bits = decode_sequence ( frame , width , height )
expected = synthetic_gray ( width , height , sequence or 0 ) if sequence is not None else None
upper_mae = lower_mae = total_mae = 0.0
shift_pixels = 0
shift_zero_delta = shift_best_delta = shift_improvement = 0.0
if expected is not None :
upper_mae = sampled_abs_delta ( frame , expected , width , 0 , height / / 2 , args . x_step , args . y_step )
lower_mae = sampled_abs_delta ( frame , expected , width , height / / 2 , height , args . x_step , args . y_step )
total_mae = sampled_abs_delta ( frame , expected , width , 0 , height , args . x_step , args . y_step )
shift_pixels , shift_zero_delta , shift_best_delta , shift_improvement = best_expected_shift ( frame , expected , width , height , args )
band_count = max ( 8 , args . bands )
band_h = max ( 1 , height / / band_count )
means : list [ float ] = [ ]
variances : list [ float ] = [ ]
for band in range ( band_count ) :
y0 = band * band_h
y1 = height if band == band_count - 1 else min ( height , y0 + band_h )
mean , variance = band_stats ( frame , width , y0 , y1 , args . x_step , args . y_step )
means . append ( mean )
variances . append ( variance )
lower = band_count / / 2
lower_flags = [ var < args . slab_var for var in variances [ lower : ] ]
low_var_run = max_run ( lower_flags ) / max ( 1 , len ( lower_flags ) )
mean_jumps = [ abs ( means [ idx ] - means [ idx - 1 ] ) for idx in range ( 1 , band_count ) ]
max_lower_jump = max ( mean_jumps [ lower : ] or [ 0.0 ] )
reasons : list [ str ] = [ ]
if sequence is None :
reasons . append ( " marker_decode_failed " )
elif previous_seq is not None :
if sequence == previous_seq :
reasons . append ( " frame_repeat " )
elif sequence > previous_seq + 1 :
reasons . append ( " frame_gap " )
elif sequence < previous_seq :
reasons . append ( " frame_backwards " )
if expected is not None :
if lower_mae > args . lower_mae_threshold and lower_mae > max ( upper_mae * args . lower_skew_ratio , args . lower_mae_threshold ) :
reasons . append ( " lower_half_tear " )
if total_mae > args . mae_threshold and lower_mae < = max ( upper_mae * args . lower_skew_ratio , args . lower_mae_threshold ) :
reasons . append ( " high_mae " )
if low_var_run > = 0.25 and lower_mae > args . lower_mae_threshold :
reasons . append ( " black_or_gray_slab " )
if shift_pixels and shift_zero_delta > args . shift_threshold and shift_improvement > args . shift_improvement :
reasons . append ( " horizontal_shift " )
return {
" suspicious " : bool ( reasons ) ,
" reasons " : reasons ,
" decoded_sequence " : sequence ,
" marker_uncertain_bits " : uncertain_bits ,
" upper_mae " : round ( upper_mae , 3 ) ,
" lower_mae " : round ( lower_mae , 3 ) ,
" total_mae " : round ( total_mae , 3 ) ,
" lower_low_variance_run_pct " : round ( low_var_run , 3 ) ,
" max_lower_jump " : round ( max_lower_jump , 3 ) ,
" shift_pixels " : shift_pixels ,
" shift_zero_delta " : round ( shift_zero_delta , 3 ) ,
" shift_best_delta " : round ( shift_best_delta , 3 ) ,
" shift_improvement " : round ( shift_improvement , 3 ) ,
}
def write_pgm ( path : pathlib . Path , frame : bytes , width : int , height : int ) - > None :
path . write_bytes ( f " P5 \n { width } { height } \n 255 \n " . encode ( ) + frame )
def run_capture ( args : argparse . Namespace ) - > int :
width , height , fps = mode_dimensions ( args )
command , capture_width , capture_height , device = ffmpeg_cmd ( args , width , height )
artifact_dir = pathlib . Path ( args . artifact_dir ) if args . artifact_dir else pathlib . Path ( " /tmp " ) / f " lesavka-synthetic-rct-capture- { timestamp ( ) } "
artifact_dir . mkdir ( parents = True , exist_ok = True )
frame_size = capture_width * capture_height
stderr_path = artifact_dir / " ffmpeg.stderr "
metrics_path = artifact_dir / " frame-metrics.jsonl "
2026-05-17 10:09:43 -03:00
capture_started = time . monotonic ( )
capture_elapsed = 0.0
analysis_elapsed = 0.0
raw_capture_bytes = 0
ffmpeg_rc : int | None = None
2026-05-16 06:48:01 -03:00
frame_index = 0
suspicious_count = 0
reference_artifacts = 0
suspicious_artifacts = 0
previous_seq : int | None = None
decoded_frames = 0
reason_counts : collections . Counter [ str ] = collections . Counter ( )
2026-05-17 00:42:08 -03:00
sequence_counts : collections . Counter [ int ] = collections . Counter ( )
2026-05-16 06:48:01 -03:00
max_total_mae = max_upper_mae = max_lower_mae = 0.0
worst : list [ dict [ str , Any ] ] = [ ]
2026-05-17 10:09:43 -03:00
def analyze_captured_frame ( frame : bytes , elapsed_s : float , metrics : Any ) - > None :
nonlocal frame_index , suspicious_count , reference_artifacts , suspicious_artifacts
nonlocal previous_seq , decoded_frames , max_total_mae , max_upper_mae , max_lower_mae , worst
frame_index + = 1
result = analyze_frame ( frame , capture_width , capture_height , args , previous_seq )
decoded_seq = result [ " decoded_sequence " ]
if decoded_seq is not None :
decoded_frames + = 1
sequence_counts [ int ( decoded_seq ) ] + = 1
previous_seq = int ( decoded_seq )
result . update ( { " frame " : frame_index , " elapsed_s " : round ( elapsed_s , 3 ) } )
max_total_mae = max ( max_total_mae , float ( result [ " total_mae " ] ) )
max_upper_mae = max ( max_upper_mae , float ( result [ " upper_mae " ] ) )
max_lower_mae = max ( max_lower_mae , float ( result [ " lower_mae " ] ) )
if result [ " suspicious " ] :
suspicious_count + = 1
reason_counts . update ( result [ " reasons " ] )
worst . append ( result )
worst = sorted ( worst , key = lambda item : ( item [ " lower_mae " ] , item [ " total_mae " ] ) , reverse = True ) [ : 30 ]
if suspicious_artifacts < args . max_suspicious_artifacts :
seq_label = " unknown " if decoded_seq is None else f " seq { decoded_seq : 08d } "
write_pgm ( artifact_dir / f " suspicious_ { frame_index : 06d } _ { seq_label } .pgm " , frame , capture_width , capture_height )
2026-05-16 06:48:01 -03:00
if decoded_seq is not None :
2026-05-17 10:09:43 -03:00
write_pgm (
artifact_dir / f " expected_ { frame_index : 06d } _ { seq_label } .pgm " ,
synthetic_gray ( capture_width , capture_height , int ( decoded_seq ) ) ,
capture_width ,
capture_height ,
)
suspicious_artifacts + = 1
should_reference = frame_index == 1 or ( args . reference_every > 0 and frame_index % args . reference_every == 0 )
if should_reference and reference_artifacts < args . max_reference_artifacts :
write_pgm ( artifact_dir / f " reference_ { frame_index : 06d } .pgm " , frame , capture_width , capture_height )
reference_artifacts + = 1
metrics . write ( json . dumps ( result , sort_keys = True ) + " \n " )
if frame_index % args . progress_every == 0 :
print ( f " frames= { frame_index } suspicious= { suspicious_count } latest= { result } " , file = sys . stderr )
with stderr_path . open ( " wb " ) as err , metrics_path . open ( " w " ) as metrics :
if args . stream_analyze :
( artifact_dir / " command.txt " ) . write_text ( " " . join ( shlex . quote ( part ) for part in command ) + " \n " )
proc = subprocess . Popen ( command , stdout = subprocess . PIPE , stderr = err )
assert proc . stdout is not None
capture_started = time . monotonic ( )
try :
while time . monotonic ( ) - capture_started < args . duration :
frame = proc . stdout . read ( frame_size )
if len ( frame ) != frame_size :
break
analyze_captured_frame ( frame , time . monotonic ( ) - capture_started , metrics )
finally :
proc . terminate ( )
try :
ffmpeg_rc = proc . wait ( timeout = 3 )
except subprocess . TimeoutExpired :
proc . kill ( )
ffmpeg_rc = proc . wait ( )
capture_elapsed = time . monotonic ( ) - capture_started
analysis_elapsed = capture_elapsed
else :
raw_path = artifact_dir / " capture.raw "
capture_command = command [ : ]
if " -an " in capture_command :
capture_command [ capture_command . index ( " -an " ) : capture_command . index ( " -an " ) ] = [ " -t " , str ( args . duration ) ]
else :
capture_command [ - 1 : - 1 ] = [ " -t " , str ( args . duration ) ]
capture_command [ - 1 ] = str ( raw_path )
( artifact_dir / " command.txt " ) . write_text ( " " . join ( shlex . quote ( part ) for part in capture_command ) + " \n " )
print ( f " capturing raw RCT frames before analysis: { raw_path } " , file = sys . stderr )
capture_started = time . monotonic ( )
proc = subprocess . run ( capture_command , stdout = subprocess . DEVNULL , stderr = err , check = False )
capture_elapsed = time . monotonic ( ) - capture_started
ffmpeg_rc = proc . returncode
raw_capture_bytes = raw_path . stat ( ) . st_size if raw_path . exists ( ) else 0
print (
f " analyzing captured raw RCT frames bytes= { raw_capture_bytes } capture_s= { capture_elapsed : .3f } " ,
file = sys . stderr ,
)
analysis_started = time . monotonic ( )
2026-05-16 06:48:01 -03:00
try :
2026-05-17 10:09:43 -03:00
with raw_path . open ( " rb " ) as raw :
while True :
frame = raw . read ( frame_size )
if len ( frame ) != frame_size :
break
analyze_captured_frame ( frame , frame_index / max ( 1 , fps ) , metrics )
finally :
raw_path . unlink ( missing_ok = True )
analysis_elapsed = time . monotonic ( ) - analysis_started
elapsed = max ( 0.001 , capture_elapsed )
2026-05-16 06:48:01 -03:00
summary = {
" schema " : " lesavka.synthetic-rct-capture.v1 " ,
" source " : args . source ,
" device " : device ,
" mode " : args . mode ,
2026-05-17 10:09:43 -03:00
" capture_mode " : " stream " if args . stream_analyze else " rawfile " ,
2026-05-16 06:48:01 -03:00
" width " : capture_width ,
" height " : capture_height ,
" fps_requested " : fps ,
" duration_requested_s " : args . duration ,
" duration_observed_s " : round ( elapsed , 3 ) ,
2026-05-17 10:09:43 -03:00
" analysis_duration_s " : round ( analysis_elapsed , 3 ) ,
" ffmpeg_rc " : ffmpeg_rc ,
" raw_capture_bytes " : raw_capture_bytes ,
2026-05-16 06:48:01 -03:00
" frames " : frame_index ,
" fps_observed " : round ( frame_index / elapsed , 3 ) ,
" decoded_frames " : decoded_frames ,
" decoded_pct " : round ( decoded_frames / frame_index * 100.0 , 3 ) if frame_index else 0.0 ,
" suspicious_frames " : suspicious_count ,
" suspicious_pct " : round ( suspicious_count / frame_index * 100.0 , 3 ) if frame_index else 0.0 ,
" reason_counts " : dict ( reason_counts ) ,
2026-05-17 00:42:08 -03:00
" decoded_sequence_counts " : dict ( sequence_counts . most_common ( 12 ) ) ,
2026-05-16 06:48:01 -03:00
" max_total_mae " : round ( max_total_mae , 3 ) ,
" max_upper_mae " : round ( max_upper_mae , 3 ) ,
" max_lower_mae " : round ( max_lower_mae , 3 ) ,
" worst_frames " : worst ,
" reference_artifacts " : reference_artifacts ,
" suspicious_artifacts " : suspicious_artifacts ,
" artifact_dir " : str ( artifact_dir ) ,
" ffmpeg_stderr " : str ( stderr_path ) ,
}
( artifact_dir / " summary.json " ) . write_text ( json . dumps ( summary , indent = 2 , sort_keys = True ) + " \n " )
( artifact_dir / " summary.txt " ) . write_text ( format_summary ( summary ) )
print ( format_summary ( summary ) , end = " " )
print ( f " artifact_dir: { artifact_dir } " )
return 0 if frame_index > 0 else 2
def format_summary ( summary : dict [ str , Any ] ) - > str :
return " \n " . join (
[
" Lesavka synthetic RCT UVC comparison probe " ,
f " source: { summary [ ' source ' ] } " ,
f " device: { summary [ ' device ' ] } " ,
f " mode: { summary [ ' mode ' ] } capture= { summary [ ' width ' ] } x { summary [ ' height ' ] } @ { summary [ ' fps_requested ' ] } " ,
f " frames: { summary [ ' frames ' ] } ( { summary [ ' fps_observed ' ] } fps observed) " ,
f " decoded markers: { summary [ ' decoded_frames ' ] } ( { summary [ ' decoded_pct ' ] } %) " ,
f " suspicious: { summary [ ' suspicious_frames ' ] } ( { summary [ ' suspicious_pct ' ] } %) " ,
f " reasons: { summary [ ' reason_counts ' ] } " ,
f " max mae: total= { summary [ ' max_total_mae ' ] } upper= { summary [ ' max_upper_mae ' ] } lower= { summary [ ' max_lower_mae ' ] } " ,
f " artifacts: { summary [ ' artifact_dir ' ] } " ,
" " ,
]
)
def run_self_test ( args : argparse . Namespace ) - > int :
width = 320
height = 180
frames = [ synthetic_gray ( width , height , idx ) for idx in range ( 6 ) ]
corrupt = bytearray ( synthetic_gray ( width , height , 6 ) )
2026-05-17 00:42:08 -03:00
fill_rect ( corrupt , width , height , 0 , height / / 2 , width , height / / 4 , 0 )
2026-05-16 06:48:01 -03:00
frames . append ( bytes ( corrupt ) )
shifted = bytearray ( width * height )
expected = synthetic_gray ( width , height , 7 )
for y in range ( height ) :
row = y * width
for x in range ( width ) :
src = min ( width - 1 , x + 24 )
shifted [ row + x ] = expected [ row + src ]
frames . append ( bytes ( shifted ) )
previous_seq : int | None = None
records : list [ dict [ str , Any ] ] = [ ]
suspicious = 0
for idx , frame in enumerate ( frames ) :
result = analyze_frame ( frame , width , height , args , previous_seq )
if result [ " decoded_sequence " ] is not None :
previous_seq = int ( result [ " decoded_sequence " ] )
result [ " frame " ] = idx
records . append ( result )
suspicious + = int ( bool ( result [ " suspicious " ] ) )
artifact_dir = pathlib . Path ( args . artifact_dir ) if args . artifact_dir else pathlib . Path ( " /tmp " ) / f " lesavka-synthetic-rct-self-test- { timestamp ( ) } "
artifact_dir . mkdir ( parents = True , exist_ok = True )
write_pgm ( artifact_dir / " reference_000001.pgm " , frames [ 0 ] , width , height )
summary = {
" schema " : " lesavka.synthetic-rct-probe.self-test.v1 " ,
" frames " : len ( frames ) ,
" suspicious_frames " : suspicious ,
" records " : records ,
" artifact_dir " : str ( artifact_dir ) ,
}
( artifact_dir / " summary.json " ) . write_text ( json . dumps ( summary , indent = 2 , sort_keys = True ) + " \n " )
print ( json . dumps ( summary , indent = 2 , sort_keys = True ) )
return 0 if suspicious > = 2 else 1
def main ( ) - > int :
args = parse_args ( )
if args . self_test :
return run_self_test ( args )
if args . capture_only :
return run_capture ( args )
return run_remote_orchestrated ( args )
if __name__ == " __main__ " :
raise SystemExit ( main ( ) )