2026-05-16 06:48:01 -03:00
#!/usr/bin/env python3
""" Run synthetic Lesavka uplink media and compare what the RCT receives. """
from __future__ import annotations
import argparse
import collections
import json
import os
import pathlib
import shlex
import shutil
import subprocess
import sys
import time
from typing import Any
DEFAULT_DEVICE_LABEL = " Lesavka Composite "
DEFAULT_MODES = " 1280x720@20,1280x720@30,1920x1080@20,1920x1080@30 "
MARKER_BITS = 32
MARKER_COLUMNS = 16
def parse_args ( ) - > argparse . Namespace :
parser = argparse . ArgumentParser (
description = (
" Manual synthetic end-to-end probe: Theia sends sequence-coded media "
" through StreamWebcamMedia while Tethys captures the received UVC/X11 "
" frames and compares them to the generated source. "
)
)
parser . add_argument ( " --inject-host " , default = " " , help = " Theia SSH host, e.g. titan-jh " )
parser . add_argument ( " --rct-host " , default = " " , help = " RCT SSH host, e.g. tethys " )
parser . add_argument ( " --server " , default = " http://127.0.0.1:50051 " )
parser . add_argument ( " --inject-binary " , default = " /usr/local/bin/lesavka-synthetic-uplink " )
parser . add_argument ( " --mode " , default = " 1280x720@30 " , help = f " one mode; baseline set is { DEFAULT_MODES } " )
parser . add_argument ( " --width " , type = int , default = 0 , help = " override capture width " )
parser . add_argument ( " --height " , type = int , default = 0 , help = " override capture height " )
parser . add_argument ( " --fps " , type = int , default = 0 , help = " override capture fps " )
parser . add_argument ( " --duration " , type = float , default = 300.0 )
parser . add_argument ( " --source " , choices = [ " device " , " x11 " ] , default = " device " )
parser . add_argument ( " --device " , default = " auto " )
parser . add_argument ( " --device-label " , default = DEFAULT_DEVICE_LABEL )
parser . add_argument ( " --display " , default = " :0 " )
parser . add_argument ( " --crop " , default = " " , help = " x,y,width,height for --source x11 " )
parser . add_argument ( " --artifact-dir " , default = " " )
parser . add_argument ( " --remote-rct-dir " , default = " " )
parser . add_argument ( " --remote-inject-dir " , default = " " )
2026-05-16 20:58:24 -03:00
parser . add_argument (
" --capture-before-inject " ,
action = " store_true " ,
help = " start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast " ,
)
parser . add_argument ( " --inject-warmup-s " , type = float , default = 1.25 )
2026-05-16 06:48:01 -03:00
parser . add_argument ( " --x-step " , type = int , default = 8 )
parser . add_argument ( " --y-step " , type = int , default = 4 )
parser . add_argument ( " --bands " , type = int , default = 24 )
parser . add_argument ( " --mae-threshold " , type = float , default = 18.0 )
parser . add_argument ( " --lower-mae-threshold " , type = float , default = 28.0 )
parser . add_argument ( " --lower-skew-ratio " , type = float , default = 1.8 )
parser . add_argument ( " --slab-var " , type = float , default = 20.0 )
parser . add_argument ( " --shift-threshold " , type = float , default = 16.0 )
parser . add_argument ( " --shift-improvement " , type = float , default = 1.25 )
parser . add_argument ( " --max-suspicious-artifacts " , type = int , default = 80 )
parser . add_argument ( " --max-reference-artifacts " , type = int , default = 12 )
parser . add_argument ( " --reference-every " , type = int , default = 900 )
parser . add_argument ( " --progress-every " , type = int , default = 150 )
parser . add_argument ( " --capture-only " , action = " store_true " , help = argparse . SUPPRESS )
parser . add_argument ( " --self-test " , action = " store_true " )
return parser . parse_args ( )
def timestamp ( ) - > str :
return time . strftime ( " % Y % m %d - % H % M % S " , time . gmtime ( ) )
def parse_mode ( value : str ) - > tuple [ int , int , int ] :
try :
size , fps = value . lower ( ) . split ( " @ " , 1 )
width , height = size . split ( " x " , 1 )
return int ( width ) , int ( height ) , int ( fps )
except ValueError as exc :
raise SystemExit ( f " --mode must look like WIDTHxHEIGHT@FPS, got { value !r} " ) from exc
def mode_dimensions ( args : argparse . Namespace ) - > tuple [ int , int , int ] :
width , height , fps = parse_mode ( args . mode )
if args . width :
width = args . width
if args . height :
height = args . height
if args . fps :
fps = args . fps
if width < = 0 or height < = 0 or fps < = 0 :
raise SystemExit ( " width, height, and fps must be positive " )
return width , height , fps
def default_artifact_dir ( mode : str ) - > pathlib . Path :
safe_mode = mode . replace ( " @ " , " - " ) . replace ( " x " , " x " )
return pathlib . Path ( " artifacts/synthetic-rct " ) / f " { safe_mode } - { timestamp ( ) } "
def run_remote_orchestrated ( args : argparse . Namespace ) - > int :
if not args . inject_host or not args . rct_host :
raise SystemExit ( " --inject-host and --rct-host are required unless --capture-only or --self-test is used " )
if not shutil . which ( " ssh " ) or not shutil . which ( " scp " ) :
raise SystemExit ( " ssh and scp are required for the remote synthetic probe " )
width , height , fps = mode_dimensions ( args )
artifact_dir = pathlib . Path ( args . artifact_dir ) if args . artifact_dir else default_artifact_dir ( args . mode )
artifact_dir . mkdir ( parents = True , exist_ok = True )
remote_rct_dir = args . remote_rct_dir or f " /tmp/lesavka-synthetic-rct-capture- { timestamp ( ) } "
remote_inject_dir = args . remote_inject_dir or f " /tmp/lesavka-synthetic-uplink- { timestamp ( ) } "
remote_script = f " /tmp/lesavka-synthetic-rct-probe- { os . getpid ( ) } .py "
script_text = pathlib . Path ( __file__ ) . read_text ( )
subprocess . run (
[ " ssh " , args . rct_host , f " cat > { shlex . quote ( remote_script ) } && chmod +x { shlex . quote ( remote_script ) } " ] ,
input = script_text ,
text = True ,
check = True ,
)
capture_cmd = [
" python3 " ,
remote_script ,
" --capture-only " ,
" --mode " ,
args . mode ,
" --width " ,
str ( width ) ,
" --height " ,
str ( height ) ,
" --fps " ,
str ( fps ) ,
" --duration " ,
str ( args . duration ) ,
" --source " ,
args . source ,
" --device " ,
args . device ,
" --device-label " ,
args . device_label ,
" --display " ,
args . display ,
" --crop " ,
args . crop ,
" --artifact-dir " ,
remote_rct_dir ,
" --x-step " ,
str ( args . x_step ) ,
" --y-step " ,
str ( args . y_step ) ,
" --bands " ,
str ( args . bands ) ,
" --mae-threshold " ,
str ( args . mae_threshold ) ,
" --lower-mae-threshold " ,
str ( args . lower_mae_threshold ) ,
" --lower-skew-ratio " ,
str ( args . lower_skew_ratio ) ,
" --slab-var " ,
str ( args . slab_var ) ,
" --shift-threshold " ,
str ( args . shift_threshold ) ,
" --shift-improvement " ,
str ( args . shift_improvement ) ,
" --max-suspicious-artifacts " ,
str ( args . max_suspicious_artifacts ) ,
" --max-reference-artifacts " ,
str ( args . max_reference_artifacts ) ,
" --reference-every " ,
str ( args . reference_every ) ,
" --progress-every " ,
str ( args . progress_every ) ,
]
inject_cmd = [
args . inject_binary ,
" --server " ,
args . server ,
" --mode " ,
args . mode ,
" --duration " ,
str ( args . duration + 2.0 ) ,
" --artifact-dir " ,
remote_inject_dir ,
" --print-every " ,
str ( args . progress_every ) ,
]
( artifact_dir / " orchestrator-command.txt " ) . write_text ( " " . join ( sys . argv ) + " \n " )
( artifact_dir / " mode.json " ) . write_text (
json . dumps (
{
" schema " : " lesavka.synthetic-rct-probe.run.v1 " ,
" mode " : args . mode ,
" width " : width ,
" height " : height ,
" fps " : fps ,
" source " : args . source ,
" duration_s " : args . duration ,
" inject_host " : args . inject_host ,
" rct_host " : args . rct_host ,
} ,
indent = 2 ,
sort_keys = True ,
)
+ " \n "
)
2026-05-16 20:58:24 -03:00
def start_capture ( ) - > subprocess . Popen [ Any ] :
print ( f " starting RCT capture on { args . rct_host } : { remote_rct_dir } " , file = sys . stderr )
return subprocess . Popen ( [ " ssh " , args . rct_host , " " . join ( shlex . quote ( part ) for part in capture_cmd ) ] )
def start_inject ( ) - > subprocess . Popen [ Any ] :
print ( f " starting synthetic uplink on { args . inject_host } : { remote_inject_dir } " , file = sys . stderr )
return subprocess . Popen ( [ " ssh " , args . inject_host , " " . join ( shlex . quote ( part ) for part in inject_cmd ) ] )
def stop_capture ( process : subprocess . Popen [ Any ] ) - > int | None :
process . terminate ( )
try :
return process . wait ( timeout = 5 )
except subprocess . TimeoutExpired :
process . kill ( )
return process . wait ( )
def wait_capture_or_inject_exit (
capture_process : subprocess . Popen [ Any ] , inject_process : subprocess . Popen [ Any ]
) - > tuple [ int | None , int | None ] :
while True :
capture_status = capture_process . poll ( )
if capture_status is not None :
return capture_status , inject_process . wait ( )
inject_status = inject_process . poll ( )
if inject_status is not None :
diagnosis . append (
" synthetic uplink exited while RCT capture was still active; stopping capture because the run is not isolated "
)
print (
f " synthetic uplink exited during capture rc= { inject_status } ; stopping RCT capture " ,
file = sys . stderr ,
)
return stop_capture ( capture_process ) , inject_status
time . sleep ( 0.25 )
capture : subprocess . Popen [ Any ] | None = None
diagnosis : list [ str ] = [ ]
if args . capture_before_inject :
capture = start_capture ( )
time . sleep ( 1.0 )
inject = start_inject ( )
capture_rc , inject_rc = wait_capture_or_inject_exit ( capture , inject )
else :
inject = start_inject ( )
time . sleep ( max ( 0.0 , args . inject_warmup_s ) )
inject_rc = inject . poll ( )
if inject_rc is not None :
capture_rc = None
diagnosis . append (
" synthetic uplink exited before capture warmup completed; disconnect the live client or pause upstream webcam before running the isolated probe "
)
print ( f " synthetic uplink exited before capture started rc= { inject_rc } " , file = sys . stderr )
else :
capture = start_capture ( )
capture_rc , inject_rc = wait_capture_or_inject_exit ( capture , inject )
2026-05-16 06:48:01 -03:00
local_capture = artifact_dir / " capture "
local_inject = artifact_dir / " inject "
2026-05-16 20:58:24 -03:00
if capture is not None :
subprocess . run ( [ " scp " , " -r " , f " { args . rct_host } : { remote_rct_dir } " , str ( local_capture ) ] , check = False )
2026-05-16 06:48:01 -03:00
subprocess . run ( [ " scp " , " -r " , f " { args . inject_host } : { remote_inject_dir } " , str ( local_inject ) ] , check = False )
2026-05-16 20:58:24 -03:00
capture_summary = local_capture / " summary.json "
if capture_summary . exists ( ) :
try :
capture_data = json . loads ( capture_summary . read_text ( ) )
decoded_pct = float ( capture_data . get ( " decoded_pct " ) or 0.0 )
if inject_rc != 0 and decoded_pct < 80.0 :
diagnosis . append (
" captured frames did not consistently contain synthetic markers and the injector failed; the RCT capture likely measured a mixed, previous, or live webcam stream "
)
except Exception :
pass
2026-05-16 06:48:01 -03:00
summary = {
" schema " : " lesavka.synthetic-rct-probe.orchestrator.v1 " ,
" mode " : args . mode ,
" capture_rc " : capture_rc ,
" inject_rc " : inject_rc ,
2026-05-16 20:58:24 -03:00
" diagnosis " : diagnosis ,
2026-05-16 06:48:01 -03:00
" artifact_dir " : str ( artifact_dir ) ,
" capture_artifacts " : str ( local_capture ) ,
" inject_artifacts " : str ( local_inject ) ,
}
( artifact_dir / " run-summary.json " ) . write_text ( json . dumps ( summary , indent = 2 , sort_keys = True ) + " \n " )
print ( json . dumps ( summary , indent = 2 , sort_keys = True ) )
print ( f " artifact_dir: { artifact_dir } " )
return 0 if capture_rc == 0 and inject_rc == 0 else 1
def detect_video_device ( label : str ) - > str :
explicit = os . environ . get ( " LESAVKA_RCT_UVC_DEVICE " )
if explicit :
return explicit
try :
listing = subprocess . check_output ( [ " v4l2-ctl " , " --list-devices " ] , text = True )
except Exception :
return " /dev/video2 "
current_matches = False
for line in listing . splitlines ( ) :
if not line . startswith ( ( " \t " , " " ) ) :
current_matches = label . lower ( ) in line . lower ( )
continue
value = line . strip ( )
if current_matches and value . startswith ( " /dev/video " ) :
return value
return " /dev/video2 "
def parse_crop ( args : argparse . Namespace , width : int , height : int ) - > tuple [ int , int , int , int ] :
if not args . crop :
return 0 , 0 , width , height
parts = [ part . strip ( ) for part in args . crop . split ( " , " ) ]
if len ( parts ) != 4 :
raise SystemExit ( " --crop must be x,y,width,height " )
x , y , crop_width , crop_height = [ int ( part ) for part in parts ]
if crop_width < = 0 or crop_height < = 0 :
raise SystemExit ( " --crop width and height must be positive " )
return x , y , crop_width , crop_height
def ffmpeg_cmd ( args : argparse . Namespace , width : int , height : int ) - > tuple [ list [ str ] , int , int , str ] :
if args . source == " x11 " :
x , y , capture_width , capture_height = parse_crop ( args , width , height )
display = f " { args . display } + { x } , { y } "
return (
[
" ffmpeg " ,
" -hide_banner " ,
" -nostdin " ,
" -loglevel " ,
" warning " ,
" -f " ,
" x11grab " ,
" -video_size " ,
f " { capture_width } x { capture_height } " ,
" -framerate " ,
str ( args . fps or parse_mode ( args . mode ) [ 2 ] ) ,
" -i " ,
display ,
" -an " ,
" -pix_fmt " ,
" gray " ,
" -f " ,
" rawvideo " ,
" - " ,
] ,
capture_width ,
capture_height ,
display ,
)
device = detect_video_device ( args . device_label ) if args . device == " auto " else args . device
return (
[
" ffmpeg " ,
" -hide_banner " ,
" -nostdin " ,
" -loglevel " ,
" warning " ,
" -f " ,
" v4l2 " ,
" -input_format " ,
" mjpeg " ,
" -video_size " ,
f " { width } x { height } " ,
" -framerate " ,
str ( args . fps or parse_mode ( args . mode ) [ 2 ] ) ,
" -i " ,
device ,
" -an " ,
" -pix_fmt " ,
" gray " ,
" -f " ,
" rawvideo " ,
" - " ,
] ,
width ,
height ,
device ,
)
def marker_cell ( width : int , height : int ) - > int :
return max ( 6 , min ( 16 , min ( width , height ) / / 80 ) )
def fill_rect ( frame : bytearray , width : int , height : int , x0 : int , y0 : int , w : int , h : int , value : int ) - > None :
for y in range ( max ( 0 , y0 ) , min ( height , y0 + h ) ) :
row = y * width
for x in range ( max ( 0 , x0 ) , min ( width , x0 + w ) ) :
frame [ row + x ] = value
def synthetic_gray ( width : int , height : int , sequence : int ) - > bytes :
data = bytearray ( width * height )
moving_period = max ( width / / 3 , 64 )
moving_width = max ( width / / 18 , 12 )
moving_offset = ( sequence * 17 ) % moving_period
center_x = width / / 2
center_y = height / / 2
for y in range ( height ) :
row = y * width
for x in range ( width ) :
value = ( x * 3 + y * 5 + sequence * 11 ) & 0xFF
if ( x + moving_offset ) % moving_period < moving_width :
value = min ( 255 , value + 70 )
if abs ( x - center_x ) < width / / 9 and abs ( y - center_y ) < height / / 12 :
value = 255 - value / / 2
if y > = height / / 2 and ( ( ( x / / 32 ) + ( y / / 24 ) + sequence ) & 1 ) == 0 :
value / / = 3
data [ row + x ] = value
draw_marker ( data , width , height , sequence )
return bytes ( data )
def draw_marker ( frame : bytearray , width : int , height : int , sequence : int ) - > None :
cell = marker_cell ( width , height )
rows = ( MARKER_BITS + MARKER_COLUMNS - 1 ) / / MARKER_COLUMNS
if width < ( MARKER_COLUMNS + 4 ) * cell or height < ( rows + 4 ) * cell :
return
x0 = 2 * cell
y0 = 2 * cell
fill_rect ( frame , width , height , cell , cell , ( MARKER_COLUMNS + 2 ) * cell , ( rows + 2 ) * cell , 32 )
fill_rect ( frame , width , height , x0 - cell , y0 - cell , cell , cell , 255 )
fill_rect ( frame , width , height , x0 + MARKER_COLUMNS * cell , y0 - cell , cell , cell , 0 )
for bit in range ( MARKER_BITS ) :
col = bit % MARKER_COLUMNS
row = bit / / MARKER_COLUMNS
value = 255 if ( ( sequence >> bit ) & 1 ) else 0
fill_rect ( frame , width , height , x0 + col * cell , y0 + row * cell , cell , cell , value )
def cell_mean ( frame : bytes , width : int , x0 : int , y0 : int , cell : int ) - > float :
total = 0
count = 0
inset = max ( 1 , cell / / 4 )
for y in range ( y0 + inset , y0 + cell - inset ) :
row = y * width
for x in range ( x0 + inset , x0 + cell - inset ) :
total + = frame [ row + x ]
count + = 1
return total / max ( 1 , count )
def decode_sequence ( frame : bytes , width : int , height : int ) - > tuple [ int | None , int ] :
cell = marker_cell ( width , height )
rows = ( MARKER_BITS + MARKER_COLUMNS - 1 ) / / MARKER_COLUMNS
if width < ( MARKER_COLUMNS + 4 ) * cell or height < ( rows + 4 ) * cell :
return None , MARKER_BITS
x0 = 2 * cell
y0 = 2 * cell
value = 0
uncertain = 0
for bit in range ( MARKER_BITS ) :
col = bit % MARKER_COLUMNS
row = bit / / MARKER_COLUMNS
mean = cell_mean ( frame , width , x0 + col * cell , y0 + row * cell , cell )
if mean > 165 :
value | = 1 << bit
elif mean > = 90 :
uncertain + = 1
if uncertain > 6 :
return None , uncertain
return value , uncertain
def sampled_abs_delta ( a : bytes , b : bytes , width : int , y0 : int , y1 : int , x_step : int , y_step : int ) - > float :
total = 0
count = 0
for y in range ( y0 , y1 , y_step ) :
row = y * width
for x in range ( 0 , width , x_step ) :
total + = abs ( a [ row + x ] - b [ row + x ] )
count + = 1
return total / max ( 1 , count )
def band_stats ( frame : bytes , width : int , y0 : int , y1 : int , x_step : int , y_step : int ) - > tuple [ float , float ] :
total = 0
total2 = 0
count = 0
for y in range ( y0 , y1 , y_step ) :
row = y * width
for x in range ( 0 , width , x_step ) :
value = frame [ row + x ]
total + = value
total2 + = value * value
count + = 1
mean = total / max ( 1 , count )
return mean , max ( 0.0 , total2 / max ( 1 , count ) - mean * mean )
def shifted_expected_delta ( frame : bytes , expected : bytes , width : int , height : int , shift : int , args : argparse . Namespace ) - > float :
x0 = max ( 0 , - shift )
x1 = min ( width , width - shift )
if x0 > = x1 :
return 0.0
y0 = height / / 4
total = 0
count = 0
for y in range ( y0 , height , args . y_step ) :
row = y * width
for x in range ( x0 , x1 , args . x_step ) :
total + = abs ( frame [ row + x ] - expected [ row + x + shift ] )
count + = 1
return total / max ( 1 , count )
def best_expected_shift ( frame : bytes , expected : bytes , width : int , height : int , args : argparse . Namespace ) - > tuple [ int , float , float , float ] :
zero = shifted_expected_delta ( frame , expected , width , height , 0 , args )
best = zero
best_shift = 0
for shift in [ - 128 , - 96 , - 80 , - 64 , - 48 , - 32 , - 24 , - 16 , - 12 , - 8 , 8 , 12 , 16 , 24 , 32 , 48 , 64 , 80 , 96 , 128 ] :
candidate = shifted_expected_delta ( frame , expected , width , height , shift , args )
if candidate < best :
best = candidate
best_shift = shift
improvement = zero / max ( best , 0.001 ) if best_shift else 1.0
return best_shift , zero , best , improvement
def max_run ( flags : list [ bool ] ) - > int :
best = 0
current = 0
for flag in flags :
current = current + 1 if flag else 0
best = max ( best , current )
return best
def analyze_frame (
frame : bytes ,
width : int ,
height : int ,
args : argparse . Namespace ,
previous_seq : int | None ,
) - > dict [ str , Any ] :
sequence , uncertain_bits = decode_sequence ( frame , width , height )
expected = synthetic_gray ( width , height , sequence or 0 ) if sequence is not None else None
upper_mae = lower_mae = total_mae = 0.0
shift_pixels = 0
shift_zero_delta = shift_best_delta = shift_improvement = 0.0
if expected is not None :
upper_mae = sampled_abs_delta ( frame , expected , width , 0 , height / / 2 , args . x_step , args . y_step )
lower_mae = sampled_abs_delta ( frame , expected , width , height / / 2 , height , args . x_step , args . y_step )
total_mae = sampled_abs_delta ( frame , expected , width , 0 , height , args . x_step , args . y_step )
shift_pixels , shift_zero_delta , shift_best_delta , shift_improvement = best_expected_shift ( frame , expected , width , height , args )
band_count = max ( 8 , args . bands )
band_h = max ( 1 , height / / band_count )
means : list [ float ] = [ ]
variances : list [ float ] = [ ]
for band in range ( band_count ) :
y0 = band * band_h
y1 = height if band == band_count - 1 else min ( height , y0 + band_h )
mean , variance = band_stats ( frame , width , y0 , y1 , args . x_step , args . y_step )
means . append ( mean )
variances . append ( variance )
lower = band_count / / 2
lower_flags = [ var < args . slab_var for var in variances [ lower : ] ]
low_var_run = max_run ( lower_flags ) / max ( 1 , len ( lower_flags ) )
mean_jumps = [ abs ( means [ idx ] - means [ idx - 1 ] ) for idx in range ( 1 , band_count ) ]
max_lower_jump = max ( mean_jumps [ lower : ] or [ 0.0 ] )
reasons : list [ str ] = [ ]
if sequence is None :
reasons . append ( " marker_decode_failed " )
elif previous_seq is not None :
if sequence == previous_seq :
reasons . append ( " frame_repeat " )
elif sequence > previous_seq + 1 :
reasons . append ( " frame_gap " )
elif sequence < previous_seq :
reasons . append ( " frame_backwards " )
if expected is not None :
if lower_mae > args . lower_mae_threshold and lower_mae > max ( upper_mae * args . lower_skew_ratio , args . lower_mae_threshold ) :
reasons . append ( " lower_half_tear " )
if total_mae > args . mae_threshold and lower_mae < = max ( upper_mae * args . lower_skew_ratio , args . lower_mae_threshold ) :
reasons . append ( " high_mae " )
if low_var_run > = 0.25 and lower_mae > args . lower_mae_threshold :
reasons . append ( " black_or_gray_slab " )
if shift_pixels and shift_zero_delta > args . shift_threshold and shift_improvement > args . shift_improvement :
reasons . append ( " horizontal_shift " )
return {
" suspicious " : bool ( reasons ) ,
" reasons " : reasons ,
" decoded_sequence " : sequence ,
" marker_uncertain_bits " : uncertain_bits ,
" upper_mae " : round ( upper_mae , 3 ) ,
" lower_mae " : round ( lower_mae , 3 ) ,
" total_mae " : round ( total_mae , 3 ) ,
" lower_low_variance_run_pct " : round ( low_var_run , 3 ) ,
" max_lower_jump " : round ( max_lower_jump , 3 ) ,
" shift_pixels " : shift_pixels ,
" shift_zero_delta " : round ( shift_zero_delta , 3 ) ,
" shift_best_delta " : round ( shift_best_delta , 3 ) ,
" shift_improvement " : round ( shift_improvement , 3 ) ,
}
def write_pgm ( path : pathlib . Path , frame : bytes , width : int , height : int ) - > None :
path . write_bytes ( f " P5 \n { width } { height } \n 255 \n " . encode ( ) + frame )
def run_capture ( args : argparse . Namespace ) - > int :
width , height , fps = mode_dimensions ( args )
command , capture_width , capture_height , device = ffmpeg_cmd ( args , width , height )
artifact_dir = pathlib . Path ( args . artifact_dir ) if args . artifact_dir else pathlib . Path ( " /tmp " ) / f " lesavka-synthetic-rct-capture- { timestamp ( ) } "
artifact_dir . mkdir ( parents = True , exist_ok = True )
frame_size = capture_width * capture_height
( artifact_dir / " command.txt " ) . write_text ( " " . join ( shlex . quote ( part ) for part in command ) + " \n " )
stderr_path = artifact_dir / " ffmpeg.stderr "
metrics_path = artifact_dir / " frame-metrics.jsonl "
started = time . monotonic ( )
frame_index = 0
suspicious_count = 0
reference_artifacts = 0
suspicious_artifacts = 0
previous_seq : int | None = None
decoded_frames = 0
reason_counts : collections . Counter [ str ] = collections . Counter ( )
max_total_mae = max_upper_mae = max_lower_mae = 0.0
worst : list [ dict [ str , Any ] ] = [ ]
with stderr_path . open ( " wb " ) as err , metrics_path . open ( " w " ) as metrics :
proc = subprocess . Popen ( command , stdout = subprocess . PIPE , stderr = err )
assert proc . stdout is not None
try :
while time . monotonic ( ) - started < args . duration :
frame = proc . stdout . read ( frame_size )
if len ( frame ) != frame_size :
break
frame_index + = 1
result = analyze_frame ( frame , capture_width , capture_height , args , previous_seq )
decoded_seq = result [ " decoded_sequence " ]
if decoded_seq is not None :
decoded_frames + = 1
previous_seq = int ( decoded_seq )
result . update ( { " frame " : frame_index , " elapsed_s " : round ( time . monotonic ( ) - started , 3 ) } )
max_total_mae = max ( max_total_mae , float ( result [ " total_mae " ] ) )
max_upper_mae = max ( max_upper_mae , float ( result [ " upper_mae " ] ) )
max_lower_mae = max ( max_lower_mae , float ( result [ " lower_mae " ] ) )
if result [ " suspicious " ] :
suspicious_count + = 1
reason_counts . update ( result [ " reasons " ] )
worst . append ( result )
worst = sorted ( worst , key = lambda item : ( item [ " lower_mae " ] , item [ " total_mae " ] ) , reverse = True ) [ : 30 ]
if suspicious_artifacts < args . max_suspicious_artifacts :
seq_label = " unknown " if decoded_seq is None else f " seq { decoded_seq : 08d } "
write_pgm ( artifact_dir / f " suspicious_ { frame_index : 06d } _ { seq_label } .pgm " , frame , capture_width , capture_height )
if decoded_seq is not None :
write_pgm (
artifact_dir / f " expected_ { frame_index : 06d } _ { seq_label } .pgm " ,
synthetic_gray ( capture_width , capture_height , int ( decoded_seq ) ) ,
capture_width ,
capture_height ,
)
suspicious_artifacts + = 1
should_reference = frame_index == 1 or ( args . reference_every > 0 and frame_index % args . reference_every == 0 )
if should_reference and reference_artifacts < args . max_reference_artifacts :
write_pgm ( artifact_dir / f " reference_ { frame_index : 06d } .pgm " , frame , capture_width , capture_height )
reference_artifacts + = 1
metrics . write ( json . dumps ( result , sort_keys = True ) + " \n " )
if frame_index % args . progress_every == 0 :
print ( f " frames= { frame_index } suspicious= { suspicious_count } latest= { result } " , file = sys . stderr )
finally :
proc . terminate ( )
try :
proc . wait ( timeout = 3 )
except subprocess . TimeoutExpired :
proc . kill ( )
elapsed = max ( 0.001 , time . monotonic ( ) - started )
summary = {
" schema " : " lesavka.synthetic-rct-capture.v1 " ,
" source " : args . source ,
" device " : device ,
" mode " : args . mode ,
" width " : capture_width ,
" height " : capture_height ,
" fps_requested " : fps ,
" duration_requested_s " : args . duration ,
" duration_observed_s " : round ( elapsed , 3 ) ,
" frames " : frame_index ,
" fps_observed " : round ( frame_index / elapsed , 3 ) ,
" decoded_frames " : decoded_frames ,
" decoded_pct " : round ( decoded_frames / frame_index * 100.0 , 3 ) if frame_index else 0.0 ,
" suspicious_frames " : suspicious_count ,
" suspicious_pct " : round ( suspicious_count / frame_index * 100.0 , 3 ) if frame_index else 0.0 ,
" reason_counts " : dict ( reason_counts ) ,
" max_total_mae " : round ( max_total_mae , 3 ) ,
" max_upper_mae " : round ( max_upper_mae , 3 ) ,
" max_lower_mae " : round ( max_lower_mae , 3 ) ,
" worst_frames " : worst ,
" reference_artifacts " : reference_artifacts ,
" suspicious_artifacts " : suspicious_artifacts ,
" artifact_dir " : str ( artifact_dir ) ,
" ffmpeg_stderr " : str ( stderr_path ) ,
}
( artifact_dir / " summary.json " ) . write_text ( json . dumps ( summary , indent = 2 , sort_keys = True ) + " \n " )
( artifact_dir / " summary.txt " ) . write_text ( format_summary ( summary ) )
print ( format_summary ( summary ) , end = " " )
print ( f " artifact_dir: { artifact_dir } " )
return 0 if frame_index > 0 else 2
def format_summary ( summary : dict [ str , Any ] ) - > str :
return " \n " . join (
[
" Lesavka synthetic RCT UVC comparison probe " ,
f " source: { summary [ ' source ' ] } " ,
f " device: { summary [ ' device ' ] } " ,
f " mode: { summary [ ' mode ' ] } capture= { summary [ ' width ' ] } x { summary [ ' height ' ] } @ { summary [ ' fps_requested ' ] } " ,
f " frames: { summary [ ' frames ' ] } ( { summary [ ' fps_observed ' ] } fps observed) " ,
f " decoded markers: { summary [ ' decoded_frames ' ] } ( { summary [ ' decoded_pct ' ] } %) " ,
f " suspicious: { summary [ ' suspicious_frames ' ] } ( { summary [ ' suspicious_pct ' ] } %) " ,
f " reasons: { summary [ ' reason_counts ' ] } " ,
f " max mae: total= { summary [ ' max_total_mae ' ] } upper= { summary [ ' max_upper_mae ' ] } lower= { summary [ ' max_lower_mae ' ] } " ,
f " artifacts: { summary [ ' artifact_dir ' ] } " ,
" " ,
]
)
def run_self_test ( args : argparse . Namespace ) - > int :
width = 320
height = 180
frames = [ synthetic_gray ( width , height , idx ) for idx in range ( 6 ) ]
corrupt = bytearray ( synthetic_gray ( width , height , 6 ) )
fill_rect ( corrupt , width , height , 0 , height / / 2 , width , height / / 4 , 128 )
frames . append ( bytes ( corrupt ) )
shifted = bytearray ( width * height )
expected = synthetic_gray ( width , height , 7 )
for y in range ( height ) :
row = y * width
for x in range ( width ) :
src = min ( width - 1 , x + 24 )
shifted [ row + x ] = expected [ row + src ]
frames . append ( bytes ( shifted ) )
previous_seq : int | None = None
records : list [ dict [ str , Any ] ] = [ ]
suspicious = 0
for idx , frame in enumerate ( frames ) :
result = analyze_frame ( frame , width , height , args , previous_seq )
if result [ " decoded_sequence " ] is not None :
previous_seq = int ( result [ " decoded_sequence " ] )
result [ " frame " ] = idx
records . append ( result )
suspicious + = int ( bool ( result [ " suspicious " ] ) )
artifact_dir = pathlib . Path ( args . artifact_dir ) if args . artifact_dir else pathlib . Path ( " /tmp " ) / f " lesavka-synthetic-rct-self-test- { timestamp ( ) } "
artifact_dir . mkdir ( parents = True , exist_ok = True )
write_pgm ( artifact_dir / " reference_000001.pgm " , frames [ 0 ] , width , height )
summary = {
" schema " : " lesavka.synthetic-rct-probe.self-test.v1 " ,
" frames " : len ( frames ) ,
" suspicious_frames " : suspicious ,
" records " : records ,
" artifact_dir " : str ( artifact_dir ) ,
}
( artifact_dir / " summary.json " ) . write_text ( json . dumps ( summary , indent = 2 , sort_keys = True ) + " \n " )
print ( json . dumps ( summary , indent = 2 , sort_keys = True ) )
return 0 if suspicious > = 2 else 1
def main ( ) - > int :
args = parse_args ( )
if args . self_test :
return run_self_test ( args )
if args . capture_only :
return run_capture ( args )
return run_remote_orchestrated ( args )
if __name__ == " __main__ " :
raise SystemExit ( main ( ) )