diff --git a/Cargo.lock b/Cargo.lock index 5083fe5..9549099 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.51" +version = "0.22.52" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.51" +version = "0.22.52" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.51" +version = "0.22.52" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 4bcb078..e1f4e7b 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.51" +version = "0.22.52" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 70fd992..7e23786 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.51" +version = "0.22.52" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_synthetic_rct_uvc_probe.py b/scripts/manual/run_synthetic_rct_uvc_probe.py index 1e0ca8b..59149cf 100755 --- a/scripts/manual/run_synthetic_rct_uvc_probe.py +++ b/scripts/manual/run_synthetic_rct_uvc_probe.py @@ -34,6 +34,7 @@ def parse_args() -> argparse.Namespace: ) ) parser.add_argument("--inject-host", default="", help="Theia SSH host, e.g. titan-jh") + parser.add_argument("--local-inject", action="store_true", help="run the synthetic injector directly on this host") parser.add_argument("--rct-host", default="", help="RCT SSH host, e.g. tethys") parser.add_argument("--server", default="https://127.0.0.1:50051") parser.add_argument("--inject-binary", default="/usr/local/bin/lesavka-synthetic-uplink") @@ -124,8 +125,10 @@ def default_artifact_dir(mode: str) -> pathlib.Path: def run_remote_orchestrated(args: argparse.Namespace) -> int: - if not args.inject_host or not args.rct_host: - raise SystemExit("--inject-host and --rct-host are required unless --capture-only or --self-test is used") + if (not args.inject_host and not args.local_inject) or not args.rct_host: + raise SystemExit( + "--rct-host and either --inject-host or --local-inject are required unless --capture-only or --self-test is used" + ) if not shutil.which("ssh") or not shutil.which("scp"): raise SystemExit("ssh and scp are required for the remote synthetic probe") width, height, fps = mode_dimensions(args) @@ -227,6 +230,7 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int: "jpeg_quality": args.jpeg_quality, "inject_max_frame_bytes": inject_max_frame_bytes, "inject_host": args.inject_host, + "local_inject": args.local_inject, "rct_host": args.rct_host, }, indent=2, @@ -239,6 +243,9 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int: return subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)]) def start_inject() -> subprocess.Popen[Any]: + if args.local_inject: + print(f"starting local synthetic uplink: {remote_inject_dir}", file=sys.stderr) + return subprocess.Popen(inject_cmd) print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr) return subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)]) @@ -304,7 +311,13 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int: local_inject = artifact_dir / "inject" if capture is not None: subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False) - subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False) + if args.local_inject: + if pathlib.Path(remote_inject_dir).exists(): + if local_inject.exists(): + shutil.rmtree(local_inject) + shutil.copytree(remote_inject_dir, local_inject) + else: + subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False) capture_summary = local_capture / "summary.json" if capture_summary.exists(): try: @@ -334,12 +347,23 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int: try: inject_data = json.loads(inject_summary.read_text()) oversize_frames = int(inject_data.get("encoded_oversize_frames") or 0) + sent_frames = int(inject_data.get("sent_frames") or 0) + encoded_frames = int(inject_data.get("encoded_frames") or 0) + exit_reason = str(inject_data.get("exit_reason") or "") max_bytes = inject_data.get("encoded_max_bytes") max_frame_bytes = inject_data.get("max_frame_bytes") if oversize_frames: diagnosis.append( f"synthetic injector produced {oversize_frames} over-budget MJPEG frame(s), max={max_bytes} cap={max_frame_bytes}; the server will freeze instead of spooling those frames" ) + if inject_rc != 0 and "StreamWebcamMedia closed before accepting synthetic frame" in exit_reason: + diagnosis.append( + f"synthetic injector was preempted after sending {sent_frames} frame(s); disconnect/pause the live Lesavka client upstream before running this isolated probe" + ) + elif inject_rc != 0 and encoded_frames > 0 and not oversize_frames: + diagnosis.append( + f"synthetic injector encoded {encoded_frames} in-budget frame(s) before failing; inspect inject/summary.json exit_reason for the stream-close cause" + ) except Exception: pass summary = { diff --git a/server/Cargo.toml b/server/Cargo.toml index c64eb15..5e25276 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.51" +version = "0.22.52" edition = "2024" autobins = false diff --git a/server/src/bin/lesavka-synthetic-uplink.rs b/server/src/bin/lesavka-synthetic-uplink.rs index 7663edd..15c9671 100755 --- a/server/src/bin/lesavka-synthetic-uplink.rs +++ b/server/src/bin/lesavka-synthetic-uplink.rs @@ -130,6 +130,7 @@ struct MjpegEncoder { #[derive(Clone, Copy, Debug, Default)] struct EncodeStats { frames: u64, + sent_frames: u64, total_bytes: u128, min_bytes: usize, max_bytes: usize, @@ -151,6 +152,10 @@ impl EncodeStats { } } + fn record_sent(&mut self) { + self.sent_frames = self.sent_frames.saturating_add(1); + } + fn mean_bytes(&self) -> usize { if self.frames == 0 { 0 @@ -352,7 +357,7 @@ async fn main() -> Result<()> { dir.join("command.txt"), std::env::args().collect::>().join(" ") + "\n", )?; - write_summary(&args, None)?; + write_summary(&args, None, None)?; } let channel = connect_channel(&args).await?; @@ -388,7 +393,11 @@ async fn main() -> Result<()> { let encoded = encoder.encode(sequence)?; encode_stats.record(encoded.len(), args.max_frame_bytes); if args.max_frame_bytes > 0 && encoded.len() > args.max_frame_bytes { - write_summary(&args, Some(&encode_stats))?; + write_summary( + &args, + Some(&encode_stats), + Some("encoded_frame_over_budget"), + )?; bail!( "encoded synthetic frame {sequence} is {} bytes, above --max-frame-bytes {}; lower --jpeg-quality or use a more compressible synthetic pattern before trusting the RCT probe", encoded.len(), @@ -397,19 +406,28 @@ async fn main() -> Result<()> { } let bundle = synthetic_bundle(&args, sequence, pts_us, encoded); if tx.send(bundle).await.is_err() { + let closed_reason = format!( + "StreamWebcamMedia closed before accepting synthetic frame {sequence}; disconnect or pause any live Lesavka client upstream before running the isolated RCT probe" + ); let response_result = response_task .await .context("joining StreamWebcamMedia task after early close")?; match response_result { - Ok(()) => bail!( - "StreamWebcamMedia closed before accepting synthetic frame {sequence}; disconnect or pause any live Lesavka client upstream before running the isolated RCT probe" - ), + Ok(()) => { + write_summary(&args, Some(&encode_stats), Some(&closed_reason))?; + bail!("{closed_reason}"); + } Err(err) => { + let error_reason = format!( + "StreamWebcamMedia closed before accepting synthetic frame {sequence}: {err:#}" + ); + write_summary(&args, Some(&encode_stats), Some(&error_reason))?; return Err(err) .context("StreamWebcamMedia closed before accepting synthetic frame"); } } } + encode_stats.record_sent(); if args.print_every > 0 && sequence > 0 && sequence % args.print_every == 0 { eprintln!("sent synthetic frame {sequence}/{total_frames}"); } @@ -418,7 +436,7 @@ async fn main() -> Result<()> { response_task .await .context("joining StreamWebcamMedia task")??; - write_summary(&args, Some(&encode_stats))?; + write_summary(&args, Some(&encode_stats), Some("complete"))?; eprintln!("lesavka synthetic uplink complete: frames={total_frames}"); Ok(()) } @@ -642,11 +660,15 @@ fn unix_millis() -> u64 { .min(u128::from(u64::MAX)) as u64 } -fn write_summary(args: &Args, stats: Option<&EncodeStats>) -> Result<()> { +fn write_summary( + args: &Args, + stats: Option<&EncodeStats>, + exit_reason: Option<&str>, +) -> Result<()> { if let Some(dir) = &args.artifact_dir { std::fs::write( dir.join("summary.json"), - args_summary_json(args, stats) + "\n", + args_summary_json(args, stats, exit_reason) + "\n", )?; } Ok(()) @@ -658,8 +680,19 @@ fn json_usize_or_null(value: Option) -> String { .unwrap_or_else(|| "null".to_string()) } -fn args_summary_json(args: &Args, stats: Option<&EncodeStats>) -> String { +fn json_string_or_null(value: Option<&str>) -> String { + value + .map(|value| format!("{value:?}")) + .unwrap_or_else(|| "null".to_string()) +} + +fn args_summary_json( + args: &Args, + stats: Option<&EncodeStats>, + exit_reason: Option<&str>, +) -> String { let frames = stats.map(|stats| stats.frames).unwrap_or(0); + let sent_frames = stats.map(|stats| stats.sent_frames).unwrap_or(0); let min_bytes = json_usize_or_null(stats.and_then(|stats| (stats.frames > 0).then_some(stats.min_bytes))); let max_bytes = @@ -668,8 +701,9 @@ fn args_summary_json(args: &Args, stats: Option<&EncodeStats>) -> String { stats.and_then(|stats| (stats.frames > 0).then_some(stats.mean_bytes())), ); let oversize_frames = stats.map(|stats| stats.oversize_frames).unwrap_or(0); + let exit_reason = json_string_or_null(exit_reason); format!( - "{{\"schema\":\"lesavka.synthetic-uplink.v1\",\"server\":{server:?},\"width\":{width},\"height\":{height},\"fps\":{fps},\"duration_s\":{duration:.3},\"session_id\":{session},\"tls\":{tls},\"jpeg_quality\":{quality},\"max_frame_bytes\":{max_frame_bytes},\"encoded_frames\":{frames},\"encoded_min_bytes\":{min_bytes},\"encoded_max_bytes\":{max_bytes},\"encoded_mean_bytes\":{mean_bytes},\"encoded_oversize_frames\":{oversize_frames}}}", + "{{\"schema\":\"lesavka.synthetic-uplink.v1\",\"server\":{server:?},\"width\":{width},\"height\":{height},\"fps\":{fps},\"duration_s\":{duration:.3},\"session_id\":{session},\"tls\":{tls},\"jpeg_quality\":{quality},\"max_frame_bytes\":{max_frame_bytes},\"encoded_frames\":{frames},\"sent_frames\":{sent_frames},\"encoded_min_bytes\":{min_bytes},\"encoded_max_bytes\":{max_bytes},\"encoded_mean_bytes\":{mean_bytes},\"encoded_oversize_frames\":{oversize_frames},\"exit_reason\":{exit_reason}}}", server = args.server, width = args.width, height = args.height, @@ -680,10 +714,12 @@ fn args_summary_json(args: &Args, stats: Option<&EncodeStats>) -> String { quality = args.jpeg_quality, max_frame_bytes = args.max_frame_bytes, frames = frames, + sent_frames = sent_frames, min_bytes = min_bytes, max_bytes = max_bytes, mean_bytes = mean_bytes, oversize_frames = oversize_frames, + exit_reason = exit_reason, ) } diff --git a/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs index ff62034..8c95e49 100644 --- a/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs +++ b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs @@ -35,6 +35,7 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() { "lesavka-synthetic-uplink", "https://127.0.0.1:50051", "--inject-host", + "--local-inject", "--rct-host", "--capture-only", "--capture-before-inject", @@ -57,8 +58,11 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() { "decoded_pct", "diagnosis", "encoded_oversize_frames", + "sent_frames", + "exit_reason", "decoded_sequence_counts", "synthetic uplink completed but RCT capture did not finish", + "synthetic injector was preempted after sending", "synthetic uplink exited before capture warmup completed", "max_lower_mae", "ffmpeg",