probe: clarify synthetic stream preemption

This commit is contained in:
Brad Stein 2026-05-17 01:53:40 -03:00
parent 4988956f9c
commit 907df8f91b
7 changed files with 83 additions and 19 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.22.51" version = "0.22.52"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.22.51" version = "0.22.52"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.22.51" version = "0.22.52"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.22.51" version = "0.22.52"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.22.51" version = "0.22.52"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -34,6 +34,7 @@ def parse_args() -> argparse.Namespace:
) )
) )
parser.add_argument("--inject-host", default="", help="Theia SSH host, e.g. titan-jh") parser.add_argument("--inject-host", default="", help="Theia SSH host, e.g. titan-jh")
parser.add_argument("--local-inject", action="store_true", help="run the synthetic injector directly on this host")
parser.add_argument("--rct-host", default="", help="RCT SSH host, e.g. tethys") parser.add_argument("--rct-host", default="", help="RCT SSH host, e.g. tethys")
parser.add_argument("--server", default="https://127.0.0.1:50051") parser.add_argument("--server", default="https://127.0.0.1:50051")
parser.add_argument("--inject-binary", default="/usr/local/bin/lesavka-synthetic-uplink") parser.add_argument("--inject-binary", default="/usr/local/bin/lesavka-synthetic-uplink")
@ -124,8 +125,10 @@ def default_artifact_dir(mode: str) -> pathlib.Path:
def run_remote_orchestrated(args: argparse.Namespace) -> int: def run_remote_orchestrated(args: argparse.Namespace) -> int:
if not args.inject_host or not args.rct_host: if (not args.inject_host and not args.local_inject) or not args.rct_host:
raise SystemExit("--inject-host and --rct-host are required unless --capture-only or --self-test is used") raise SystemExit(
"--rct-host and either --inject-host or --local-inject are required unless --capture-only or --self-test is used"
)
if not shutil.which("ssh") or not shutil.which("scp"): if not shutil.which("ssh") or not shutil.which("scp"):
raise SystemExit("ssh and scp are required for the remote synthetic probe") raise SystemExit("ssh and scp are required for the remote synthetic probe")
width, height, fps = mode_dimensions(args) width, height, fps = mode_dimensions(args)
@ -227,6 +230,7 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
"jpeg_quality": args.jpeg_quality, "jpeg_quality": args.jpeg_quality,
"inject_max_frame_bytes": inject_max_frame_bytes, "inject_max_frame_bytes": inject_max_frame_bytes,
"inject_host": args.inject_host, "inject_host": args.inject_host,
"local_inject": args.local_inject,
"rct_host": args.rct_host, "rct_host": args.rct_host,
}, },
indent=2, indent=2,
@ -239,6 +243,9 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
return subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)]) return subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)])
def start_inject() -> subprocess.Popen[Any]: def start_inject() -> subprocess.Popen[Any]:
if args.local_inject:
print(f"starting local synthetic uplink: {remote_inject_dir}", file=sys.stderr)
return subprocess.Popen(inject_cmd)
print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr) print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr)
return subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)]) return subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)])
@ -304,7 +311,13 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
local_inject = artifact_dir / "inject" local_inject = artifact_dir / "inject"
if capture is not None: if capture is not None:
subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False) subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False)
subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False) if args.local_inject:
if pathlib.Path(remote_inject_dir).exists():
if local_inject.exists():
shutil.rmtree(local_inject)
shutil.copytree(remote_inject_dir, local_inject)
else:
subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False)
capture_summary = local_capture / "summary.json" capture_summary = local_capture / "summary.json"
if capture_summary.exists(): if capture_summary.exists():
try: try:
@ -334,12 +347,23 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
try: try:
inject_data = json.loads(inject_summary.read_text()) inject_data = json.loads(inject_summary.read_text())
oversize_frames = int(inject_data.get("encoded_oversize_frames") or 0) oversize_frames = int(inject_data.get("encoded_oversize_frames") or 0)
sent_frames = int(inject_data.get("sent_frames") or 0)
encoded_frames = int(inject_data.get("encoded_frames") or 0)
exit_reason = str(inject_data.get("exit_reason") or "")
max_bytes = inject_data.get("encoded_max_bytes") max_bytes = inject_data.get("encoded_max_bytes")
max_frame_bytes = inject_data.get("max_frame_bytes") max_frame_bytes = inject_data.get("max_frame_bytes")
if oversize_frames: if oversize_frames:
diagnosis.append( diagnosis.append(
f"synthetic injector produced {oversize_frames} over-budget MJPEG frame(s), max={max_bytes} cap={max_frame_bytes}; the server will freeze instead of spooling those frames" f"synthetic injector produced {oversize_frames} over-budget MJPEG frame(s), max={max_bytes} cap={max_frame_bytes}; the server will freeze instead of spooling those frames"
) )
if inject_rc != 0 and "StreamWebcamMedia closed before accepting synthetic frame" in exit_reason:
diagnosis.append(
f"synthetic injector was preempted after sending {sent_frames} frame(s); disconnect/pause the live Lesavka client upstream before running this isolated probe"
)
elif inject_rc != 0 and encoded_frames > 0 and not oversize_frames:
diagnosis.append(
f"synthetic injector encoded {encoded_frames} in-budget frame(s) before failing; inspect inject/summary.json exit_reason for the stream-close cause"
)
except Exception: except Exception:
pass pass
summary = { summary = {

View File

@ -16,7 +16,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.22.51" version = "0.22.52"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -130,6 +130,7 @@ struct MjpegEncoder {
#[derive(Clone, Copy, Debug, Default)] #[derive(Clone, Copy, Debug, Default)]
struct EncodeStats { struct EncodeStats {
frames: u64, frames: u64,
sent_frames: u64,
total_bytes: u128, total_bytes: u128,
min_bytes: usize, min_bytes: usize,
max_bytes: usize, max_bytes: usize,
@ -151,6 +152,10 @@ impl EncodeStats {
} }
} }
fn record_sent(&mut self) {
self.sent_frames = self.sent_frames.saturating_add(1);
}
fn mean_bytes(&self) -> usize { fn mean_bytes(&self) -> usize {
if self.frames == 0 { if self.frames == 0 {
0 0
@ -352,7 +357,7 @@ async fn main() -> Result<()> {
dir.join("command.txt"), dir.join("command.txt"),
std::env::args().collect::<Vec<_>>().join(" ") + "\n", std::env::args().collect::<Vec<_>>().join(" ") + "\n",
)?; )?;
write_summary(&args, None)?; write_summary(&args, None, None)?;
} }
let channel = connect_channel(&args).await?; let channel = connect_channel(&args).await?;
@ -388,7 +393,11 @@ async fn main() -> Result<()> {
let encoded = encoder.encode(sequence)?; let encoded = encoder.encode(sequence)?;
encode_stats.record(encoded.len(), args.max_frame_bytes); encode_stats.record(encoded.len(), args.max_frame_bytes);
if args.max_frame_bytes > 0 && encoded.len() > args.max_frame_bytes { if args.max_frame_bytes > 0 && encoded.len() > args.max_frame_bytes {
write_summary(&args, Some(&encode_stats))?; write_summary(
&args,
Some(&encode_stats),
Some("encoded_frame_over_budget"),
)?;
bail!( bail!(
"encoded synthetic frame {sequence} is {} bytes, above --max-frame-bytes {}; lower --jpeg-quality or use a more compressible synthetic pattern before trusting the RCT probe", "encoded synthetic frame {sequence} is {} bytes, above --max-frame-bytes {}; lower --jpeg-quality or use a more compressible synthetic pattern before trusting the RCT probe",
encoded.len(), encoded.len(),
@ -397,19 +406,28 @@ async fn main() -> Result<()> {
} }
let bundle = synthetic_bundle(&args, sequence, pts_us, encoded); let bundle = synthetic_bundle(&args, sequence, pts_us, encoded);
if tx.send(bundle).await.is_err() { if tx.send(bundle).await.is_err() {
let closed_reason = format!(
"StreamWebcamMedia closed before accepting synthetic frame {sequence}; disconnect or pause any live Lesavka client upstream before running the isolated RCT probe"
);
let response_result = response_task let response_result = response_task
.await .await
.context("joining StreamWebcamMedia task after early close")?; .context("joining StreamWebcamMedia task after early close")?;
match response_result { match response_result {
Ok(()) => bail!( Ok(()) => {
"StreamWebcamMedia closed before accepting synthetic frame {sequence}; disconnect or pause any live Lesavka client upstream before running the isolated RCT probe" write_summary(&args, Some(&encode_stats), Some(&closed_reason))?;
), bail!("{closed_reason}");
}
Err(err) => { Err(err) => {
let error_reason = format!(
"StreamWebcamMedia closed before accepting synthetic frame {sequence}: {err:#}"
);
write_summary(&args, Some(&encode_stats), Some(&error_reason))?;
return Err(err) return Err(err)
.context("StreamWebcamMedia closed before accepting synthetic frame"); .context("StreamWebcamMedia closed before accepting synthetic frame");
} }
} }
} }
encode_stats.record_sent();
if args.print_every > 0 && sequence > 0 && sequence % args.print_every == 0 { if args.print_every > 0 && sequence > 0 && sequence % args.print_every == 0 {
eprintln!("sent synthetic frame {sequence}/{total_frames}"); eprintln!("sent synthetic frame {sequence}/{total_frames}");
} }
@ -418,7 +436,7 @@ async fn main() -> Result<()> {
response_task response_task
.await .await
.context("joining StreamWebcamMedia task")??; .context("joining StreamWebcamMedia task")??;
write_summary(&args, Some(&encode_stats))?; write_summary(&args, Some(&encode_stats), Some("complete"))?;
eprintln!("lesavka synthetic uplink complete: frames={total_frames}"); eprintln!("lesavka synthetic uplink complete: frames={total_frames}");
Ok(()) Ok(())
} }
@ -642,11 +660,15 @@ fn unix_millis() -> u64 {
.min(u128::from(u64::MAX)) as u64 .min(u128::from(u64::MAX)) as u64
} }
fn write_summary(args: &Args, stats: Option<&EncodeStats>) -> Result<()> { fn write_summary(
args: &Args,
stats: Option<&EncodeStats>,
exit_reason: Option<&str>,
) -> Result<()> {
if let Some(dir) = &args.artifact_dir { if let Some(dir) = &args.artifact_dir {
std::fs::write( std::fs::write(
dir.join("summary.json"), dir.join("summary.json"),
args_summary_json(args, stats) + "\n", args_summary_json(args, stats, exit_reason) + "\n",
)?; )?;
} }
Ok(()) Ok(())
@ -658,8 +680,19 @@ fn json_usize_or_null(value: Option<usize>) -> String {
.unwrap_or_else(|| "null".to_string()) .unwrap_or_else(|| "null".to_string())
} }
fn args_summary_json(args: &Args, stats: Option<&EncodeStats>) -> String { fn json_string_or_null(value: Option<&str>) -> String {
value
.map(|value| format!("{value:?}"))
.unwrap_or_else(|| "null".to_string())
}
fn args_summary_json(
args: &Args,
stats: Option<&EncodeStats>,
exit_reason: Option<&str>,
) -> String {
let frames = stats.map(|stats| stats.frames).unwrap_or(0); let frames = stats.map(|stats| stats.frames).unwrap_or(0);
let sent_frames = stats.map(|stats| stats.sent_frames).unwrap_or(0);
let min_bytes = let min_bytes =
json_usize_or_null(stats.and_then(|stats| (stats.frames > 0).then_some(stats.min_bytes))); json_usize_or_null(stats.and_then(|stats| (stats.frames > 0).then_some(stats.min_bytes)));
let max_bytes = let max_bytes =
@ -668,8 +701,9 @@ fn args_summary_json(args: &Args, stats: Option<&EncodeStats>) -> String {
stats.and_then(|stats| (stats.frames > 0).then_some(stats.mean_bytes())), stats.and_then(|stats| (stats.frames > 0).then_some(stats.mean_bytes())),
); );
let oversize_frames = stats.map(|stats| stats.oversize_frames).unwrap_or(0); let oversize_frames = stats.map(|stats| stats.oversize_frames).unwrap_or(0);
let exit_reason = json_string_or_null(exit_reason);
format!( format!(
"{{\"schema\":\"lesavka.synthetic-uplink.v1\",\"server\":{server:?},\"width\":{width},\"height\":{height},\"fps\":{fps},\"duration_s\":{duration:.3},\"session_id\":{session},\"tls\":{tls},\"jpeg_quality\":{quality},\"max_frame_bytes\":{max_frame_bytes},\"encoded_frames\":{frames},\"encoded_min_bytes\":{min_bytes},\"encoded_max_bytes\":{max_bytes},\"encoded_mean_bytes\":{mean_bytes},\"encoded_oversize_frames\":{oversize_frames}}}", "{{\"schema\":\"lesavka.synthetic-uplink.v1\",\"server\":{server:?},\"width\":{width},\"height\":{height},\"fps\":{fps},\"duration_s\":{duration:.3},\"session_id\":{session},\"tls\":{tls},\"jpeg_quality\":{quality},\"max_frame_bytes\":{max_frame_bytes},\"encoded_frames\":{frames},\"sent_frames\":{sent_frames},\"encoded_min_bytes\":{min_bytes},\"encoded_max_bytes\":{max_bytes},\"encoded_mean_bytes\":{mean_bytes},\"encoded_oversize_frames\":{oversize_frames},\"exit_reason\":{exit_reason}}}",
server = args.server, server = args.server,
width = args.width, width = args.width,
height = args.height, height = args.height,
@ -680,10 +714,12 @@ fn args_summary_json(args: &Args, stats: Option<&EncodeStats>) -> String {
quality = args.jpeg_quality, quality = args.jpeg_quality,
max_frame_bytes = args.max_frame_bytes, max_frame_bytes = args.max_frame_bytes,
frames = frames, frames = frames,
sent_frames = sent_frames,
min_bytes = min_bytes, min_bytes = min_bytes,
max_bytes = max_bytes, max_bytes = max_bytes,
mean_bytes = mean_bytes, mean_bytes = mean_bytes,
oversize_frames = oversize_frames, oversize_frames = oversize_frames,
exit_reason = exit_reason,
) )
} }

View File

@ -35,6 +35,7 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
"lesavka-synthetic-uplink", "lesavka-synthetic-uplink",
"https://127.0.0.1:50051", "https://127.0.0.1:50051",
"--inject-host", "--inject-host",
"--local-inject",
"--rct-host", "--rct-host",
"--capture-only", "--capture-only",
"--capture-before-inject", "--capture-before-inject",
@ -57,8 +58,11 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
"decoded_pct", "decoded_pct",
"diagnosis", "diagnosis",
"encoded_oversize_frames", "encoded_oversize_frames",
"sent_frames",
"exit_reason",
"decoded_sequence_counts", "decoded_sequence_counts",
"synthetic uplink completed but RCT capture did not finish", "synthetic uplink completed but RCT capture did not finish",
"synthetic injector was preempted after sending",
"synthetic uplink exited before capture warmup completed", "synthetic uplink exited before capture warmup completed",
"max_lower_mae", "max_lower_mae",
"ffmpeg", "ffmpeg",