diff --git a/docs/operational-env.md b/docs/operational-env.md index f221c50..64f123b 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -573,6 +573,7 @@ These entries are intentionally concise because most are manual lab or CI harnes | `LESAVKA_UAC_APP_MAX_BUFFERS` | server UAC appsrc buffering override for lab tuning of microphone gadget output latency and stability | | `LESAVKA_UAC_APP_MAX_BYTES` | server UAC appsrc buffering override for lab tuning of microphone gadget output latency and stability | | `LESAVKA_UAC_APP_MAX_TIME_NS` | server UAC appsrc buffering override for lab tuning of microphone gadget output latency and stability | +| `LESAVKA_UAC_APP_LEAKY_TYPE` | server UAC appsrc live-backlog policy (`downstream`, `upstream`, or `none`); defaults to `downstream` so a slow RCT audio reader drops stale audio instead of growing RSS | | `LESAVKA_MIC_NOISE_SUPPRESSION` | client microphone capture toggle; when truthy, inserts WebRTC DSP noise suppression before upstream audio transport | | `LESAVKA_UPLINK_AUDIO_CODEC` | client/server upstream microphone transport hint (`opus` or `pcm`); launcher and installer default to PCM until Opus calibration is intentionally selected | | `LESAVKA_UPLINK_CAMERA_CODEC` | server camera ingress codec hint; records whether upstream camera media arrives as `mjpeg`, `h264`, or `hevc` before UVC output | diff --git a/scripts/install/server.sh b/scripts/install/server.sh index c6faa0f..9edd930 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -1566,6 +1566,7 @@ SERVER_ENV_TMP=$(mktemp) printf 'LESAVKA_UAC_HOST_MIXER_CONTROLS=%s\n' "${LESAVKA_UAC_HOST_MIXER_CONTROLS:-0}" printf 'LESAVKA_UAC_HDMI_COMPENSATION_US=%s\n' "${LESAVKA_UAC_HDMI_COMPENSATION_US:-205000}" printf 'LESAVKA_UAC_SESSION_CLOCK_ALIGN=%s\n' "${LESAVKA_UAC_SESSION_CLOCK_ALIGN:-0}" + printf 'LESAVKA_UAC_APP_LEAKY_TYPE=%s\n' "${LESAVKA_UAC_APP_LEAKY_TYPE:-downstream}" printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}" printf 'LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS=%s\n' "${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}" printf 'LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS=%s\n' "${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}" diff --git a/scripts/manual/run_synthetic_rct_uvc_probe.py b/scripts/manual/run_synthetic_rct_uvc_probe.py index daf1f62..e058774 100755 --- a/scripts/manual/run_synthetic_rct_uvc_probe.py +++ b/scripts/manual/run_synthetic_rct_uvc_probe.py @@ -46,6 +46,12 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--artifact-dir", default="") parser.add_argument("--remote-rct-dir", default="") parser.add_argument("--remote-inject-dir", default="") + parser.add_argument( + "--capture-before-inject", + action="store_true", + help="start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast", + ) + parser.add_argument("--inject-warmup-s", type=float, default=1.25) parser.add_argument("--x-step", type=int, default=8) parser.add_argument("--y-step", type=int, default=4) parser.add_argument("--bands", type=int, default=24) @@ -199,22 +205,83 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int: ) + "\n" ) - print(f"starting RCT capture on {args.rct_host}: {remote_rct_dir}", file=sys.stderr) - capture = subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)]) - time.sleep(1.0) - print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr) - inject = subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)]) - inject_rc = inject.wait() - capture_rc = capture.wait() + def start_capture() -> subprocess.Popen[Any]: + print(f"starting RCT capture on {args.rct_host}: {remote_rct_dir}", file=sys.stderr) + return subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)]) + + def start_inject() -> subprocess.Popen[Any]: + print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr) + return subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)]) + + def stop_capture(process: subprocess.Popen[Any]) -> int | None: + process.terminate() + try: + return process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + return process.wait() + + def wait_capture_or_inject_exit( + capture_process: subprocess.Popen[Any], inject_process: subprocess.Popen[Any] + ) -> tuple[int | None, int | None]: + while True: + capture_status = capture_process.poll() + if capture_status is not None: + return capture_status, inject_process.wait() + inject_status = inject_process.poll() + if inject_status is not None: + diagnosis.append( + "synthetic uplink exited while RCT capture was still active; stopping capture because the run is not isolated" + ) + print( + f"synthetic uplink exited during capture rc={inject_status}; stopping RCT capture", + file=sys.stderr, + ) + return stop_capture(capture_process), inject_status + time.sleep(0.25) + + capture: subprocess.Popen[Any] | None = None + diagnosis: list[str] = [] + if args.capture_before_inject: + capture = start_capture() + time.sleep(1.0) + inject = start_inject() + capture_rc, inject_rc = wait_capture_or_inject_exit(capture, inject) + else: + inject = start_inject() + time.sleep(max(0.0, args.inject_warmup_s)) + inject_rc = inject.poll() + if inject_rc is not None: + capture_rc = None + diagnosis.append( + "synthetic uplink exited before capture warmup completed; disconnect the live client or pause upstream webcam before running the isolated probe" + ) + print(f"synthetic uplink exited before capture started rc={inject_rc}", file=sys.stderr) + else: + capture = start_capture() + capture_rc, inject_rc = wait_capture_or_inject_exit(capture, inject) local_capture = artifact_dir / "capture" local_inject = artifact_dir / "inject" - subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False) + if capture is not None: + subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False) subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False) + capture_summary = local_capture / "summary.json" + if capture_summary.exists(): + try: + capture_data = json.loads(capture_summary.read_text()) + decoded_pct = float(capture_data.get("decoded_pct") or 0.0) + if inject_rc != 0 and decoded_pct < 80.0: + diagnosis.append( + "captured frames did not consistently contain synthetic markers and the injector failed; the RCT capture likely measured a mixed, previous, or live webcam stream" + ) + except Exception: + pass summary = { "schema": "lesavka.synthetic-rct-probe.orchestrator.v1", "mode": args.mode, "capture_rc": capture_rc, "inject_rc": inject_rc, + "diagnosis": diagnosis, "artifact_dir": str(artifact_dir), "capture_artifacts": str(local_capture), "inject_artifacts": str(local_inject), diff --git a/server/src/audio/voice_input.rs b/server/src/audio/voice_input.rs index 7ff1be0..670b477 100644 --- a/server/src/audio/voice_input.rs +++ b/server/src/audio/voice_input.rs @@ -149,6 +149,14 @@ fn voice_appsrc_max_time_ns() -> u64 { positive_voice_appsrc_limit_env("LESAVKA_UAC_APP_MAX_TIME_NS", 200_000_000) } +fn voice_appsrc_leaky_type() -> String { + std::env::var("LESAVKA_UAC_APP_LEAKY_TYPE") + .ok() + .map(|value| value.trim().to_ascii_lowercase()) + .filter(|value| matches!(value.as_str(), "downstream" | "upstream" | "none")) + .unwrap_or_else(|| "downstream".to_string()) +} + fn positive_voice_appsrc_limit_env(name: &str, default: u64) -> u64 { std::env::var(name) .ok() @@ -174,7 +182,7 @@ fn configure_voice_appsrc(appsrc: &gst_app::AppSrc) { appsrc.set_property("max-time", voice_appsrc_max_time_ns()); } if appsrc.has_property("leaky-type", None) { - appsrc.set_property_from_str("leaky-type", "none"); + appsrc.set_property_from_str("leaky-type", &voice_appsrc_leaky_type()); } } diff --git a/server/src/audio/voice_input/tests/mod.rs b/server/src/audio/voice_input/tests/mod.rs index 0a467c9..bc8751b 100644 --- a/server/src/audio/voice_input/tests/mod.rs +++ b/server/src/audio/voice_input/tests/mod.rs @@ -1,7 +1,10 @@ use crate::camera::update_camera_config; use super::{voice_sink_buffer_time_us, voice_sink_latency_time_us}; use super::{default_voice_sink_compensation_us, voice_sink_compensation_us}; - use super::{voice_appsrc_max_buffers, voice_appsrc_max_bytes, voice_appsrc_max_time_ns}; + use super::{ + voice_appsrc_leaky_type, voice_appsrc_max_buffers, voice_appsrc_max_bytes, + voice_appsrc_max_time_ns, + }; #[test] /// Keeps `voice_sink_timing_defaults_stay_live_call_friendly` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose. @@ -61,6 +64,25 @@ use crate::camera::update_camera_config; }); } + #[test] + fn voice_appsrc_defaults_to_leaky_live_audio_queue() { + temp_env::with_var_unset("LESAVKA_UAC_APP_LEAKY_TYPE", || { + assert_eq!(voice_appsrc_leaky_type(), "downstream"); + }); + + temp_env::with_var("LESAVKA_UAC_APP_LEAKY_TYPE", Some("upstream"), || { + assert_eq!(voice_appsrc_leaky_type(), "upstream"); + }); + + temp_env::with_var("LESAVKA_UAC_APP_LEAKY_TYPE", Some("none"), || { + assert_eq!(voice_appsrc_leaky_type(), "none"); + }); + + temp_env::with_var("LESAVKA_UAC_APP_LEAKY_TYPE", Some("bogus"), || { + assert_eq!(voice_appsrc_leaky_type(), "downstream"); + }); + } + #[test] /// Keeps `voice_sink_timing_env_accepts_positive_overrides_only` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose. /// Inputs are the typed parameters; output is the return value or side effect. diff --git a/server/src/bin/lesavka-synthetic-uplink.rs b/server/src/bin/lesavka-synthetic-uplink.rs index 4dbf87c..30f2d9d 100755 --- a/server/src/bin/lesavka-synthetic-uplink.rs +++ b/server/src/bin/lesavka-synthetic-uplink.rs @@ -254,7 +254,20 @@ async fn main() -> Result<()> { let pts_us = sequence.saturating_mul(args.frame_step_us()); let encoded = encoder.encode(sequence)?; let bundle = synthetic_bundle(&args, sequence, pts_us, encoded); - tx.send(bundle).await.context("sending synthetic bundle")?; + if tx.send(bundle).await.is_err() { + let response_result = response_task + .await + .context("joining StreamWebcamMedia task after early close")?; + match response_result { + Ok(()) => bail!( + "StreamWebcamMedia closed before accepting synthetic frame {sequence}; disconnect or pause any live Lesavka client upstream before running the isolated RCT probe" + ), + Err(err) => { + return Err(err) + .context("StreamWebcamMedia closed before accepting synthetic frame"); + } + } + } if args.print_every > 0 && sequence > 0 && sequence % args.print_every == 0 { eprintln!("sent synthetic frame {sequence}/{total_frames}"); } diff --git a/tests/api/server/upstream_media_runtime/server_upstream_media_audio_contract.rs b/tests/api/server/upstream_media_runtime/server_upstream_media_audio_contract.rs index 5291aee..5b80cea 100644 --- a/tests/api/server/upstream_media_runtime/server_upstream_media_audio_contract.rs +++ b/tests/api/server/upstream_media_runtime/server_upstream_media_audio_contract.rs @@ -148,12 +148,18 @@ mod server_upstream_media_audio_normal_mode { env!("CARGO_MANIFEST_DIR"), "/server/src/upstream_media_runtime.rs" )); + const CALIBRATION_OFFSETS: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/server/src/calibration/profile_offsets.rs" + )); #[test] fn microphone_rpc_stays_wired_to_voice_sink_runtime() { assert!(RELAY_TRAIT.contains("stream_microphone")); assert!(MICROPHONE_RPC.contains("open_voice_with_retry")); assert!(AUDIO_SINK.contains("LESAVKA_UAC_BUFFER_TIME_US")); - assert!(RUNTIME.contains("LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US")); + assert!(AUDIO_SINK.contains("LESAVKA_UAC_APP_LEAKY_TYPE")); + assert!(RUNTIME.contains("configured_playout_offset_us")); + assert!(CALIBRATION_OFFSETS.contains("LESAVKA_UPSTREAM_{media}_PLAYOUT_OFFSET_US")); } } diff --git a/tests/installer/scripts/install/server_install_script_contract.rs b/tests/installer/scripts/install/server_install_script_contract.rs index f3a4071..c38e3ef 100644 --- a/tests/installer/scripts/install/server_install_script_contract.rs +++ b/tests/installer/scripts/install/server_install_script_contract.rs @@ -51,6 +51,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { "LESAVKA_UVC_FRAME_META_LOG_PATH=%s", "LESAVKA_UVC_CONTROL_READ_ONLY=", "LESAVKA_UAC_HOST_MIXER_CONTROLS=%s", + "LESAVKA_UAC_APP_LEAKY_TYPE=%s", "LESAVKA_REQUIRE_TLS=%s", "LESAVKA_TLS_CERT=%s", "LESAVKA_TLS_KEY=%s", @@ -94,6 +95,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UAC_HOST_MIXER_CONTROLS:-0}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UAC_APP_LEAKY_TYPE:-downstream}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}")); diff --git a/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs index d21cf5c..8c63611 100644 --- a/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs +++ b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs @@ -36,6 +36,8 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() { "--inject-host", "--rct-host", "--capture-only", + "--capture-before-inject", + "--inject-warmup-s", "--source", "--mode", "1280x720@20,1280x720@30,1920x1080@20,1920x1080@30", @@ -49,6 +51,8 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() { "expected_", "suspicious_", "decoded_pct", + "diagnosis", + "synthetic uplink exited before capture warmup completed", "max_lower_mae", "ffmpeg", "v4l2", @@ -76,6 +80,7 @@ fn synthetic_injector_enters_the_public_bundled_media_rpc() { "jpegenc", "client_capture_pts_us: pts_us", "client_send_pts_us: pts_us", + "StreamWebcamMedia closed before accepting synthetic frame", ] { assert!( SERVER_CARGO.contains(expected) || INJECTOR_SRC.contains(expected),