media: bound UAC backlog and harden synthetic probe
This commit is contained in:
parent
1473660f68
commit
4414606d26
@ -573,6 +573,7 @@ These entries are intentionally concise because most are manual lab or CI harnes
|
||||
| `LESAVKA_UAC_APP_MAX_BUFFERS` | server UAC appsrc buffering override for lab tuning of microphone gadget output latency and stability |
|
||||
| `LESAVKA_UAC_APP_MAX_BYTES` | server UAC appsrc buffering override for lab tuning of microphone gadget output latency and stability |
|
||||
| `LESAVKA_UAC_APP_MAX_TIME_NS` | server UAC appsrc buffering override for lab tuning of microphone gadget output latency and stability |
|
||||
| `LESAVKA_UAC_APP_LEAKY_TYPE` | server UAC appsrc live-backlog policy (`downstream`, `upstream`, or `none`); defaults to `downstream` so a slow RCT audio reader drops stale audio instead of growing RSS |
|
||||
| `LESAVKA_MIC_NOISE_SUPPRESSION` | client microphone capture toggle; when truthy, inserts WebRTC DSP noise suppression before upstream audio transport |
|
||||
| `LESAVKA_UPLINK_AUDIO_CODEC` | client/server upstream microphone transport hint (`opus` or `pcm`); launcher and installer default to PCM until Opus calibration is intentionally selected |
|
||||
| `LESAVKA_UPLINK_CAMERA_CODEC` | server camera ingress codec hint; records whether upstream camera media arrives as `mjpeg`, `h264`, or `hevc` before UVC output |
|
||||
|
||||
@ -1566,6 +1566,7 @@ SERVER_ENV_TMP=$(mktemp)
|
||||
printf 'LESAVKA_UAC_HOST_MIXER_CONTROLS=%s\n' "${LESAVKA_UAC_HOST_MIXER_CONTROLS:-0}"
|
||||
printf 'LESAVKA_UAC_HDMI_COMPENSATION_US=%s\n' "${LESAVKA_UAC_HDMI_COMPENSATION_US:-205000}"
|
||||
printf 'LESAVKA_UAC_SESSION_CLOCK_ALIGN=%s\n' "${LESAVKA_UAC_SESSION_CLOCK_ALIGN:-0}"
|
||||
printf 'LESAVKA_UAC_APP_LEAKY_TYPE=%s\n' "${LESAVKA_UAC_APP_LEAKY_TYPE:-downstream}"
|
||||
printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}"
|
||||
printf 'LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS=%s\n' "${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}"
|
||||
printf 'LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS=%s\n' "${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}"
|
||||
|
||||
@ -46,6 +46,12 @@ def parse_args() -> argparse.Namespace:
|
||||
parser.add_argument("--artifact-dir", default="")
|
||||
parser.add_argument("--remote-rct-dir", default="")
|
||||
parser.add_argument("--remote-inject-dir", default="")
|
||||
parser.add_argument(
|
||||
"--capture-before-inject",
|
||||
action="store_true",
|
||||
help="start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast",
|
||||
)
|
||||
parser.add_argument("--inject-warmup-s", type=float, default=1.25)
|
||||
parser.add_argument("--x-step", type=int, default=8)
|
||||
parser.add_argument("--y-step", type=int, default=4)
|
||||
parser.add_argument("--bands", type=int, default=24)
|
||||
@ -199,22 +205,83 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
|
||||
)
|
||||
+ "\n"
|
||||
)
|
||||
print(f"starting RCT capture on {args.rct_host}: {remote_rct_dir}", file=sys.stderr)
|
||||
capture = subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)])
|
||||
time.sleep(1.0)
|
||||
print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr)
|
||||
inject = subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)])
|
||||
inject_rc = inject.wait()
|
||||
capture_rc = capture.wait()
|
||||
def start_capture() -> subprocess.Popen[Any]:
|
||||
print(f"starting RCT capture on {args.rct_host}: {remote_rct_dir}", file=sys.stderr)
|
||||
return subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)])
|
||||
|
||||
def start_inject() -> subprocess.Popen[Any]:
|
||||
print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr)
|
||||
return subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)])
|
||||
|
||||
def stop_capture(process: subprocess.Popen[Any]) -> int | None:
|
||||
process.terminate()
|
||||
try:
|
||||
return process.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
process.kill()
|
||||
return process.wait()
|
||||
|
||||
def wait_capture_or_inject_exit(
|
||||
capture_process: subprocess.Popen[Any], inject_process: subprocess.Popen[Any]
|
||||
) -> tuple[int | None, int | None]:
|
||||
while True:
|
||||
capture_status = capture_process.poll()
|
||||
if capture_status is not None:
|
||||
return capture_status, inject_process.wait()
|
||||
inject_status = inject_process.poll()
|
||||
if inject_status is not None:
|
||||
diagnosis.append(
|
||||
"synthetic uplink exited while RCT capture was still active; stopping capture because the run is not isolated"
|
||||
)
|
||||
print(
|
||||
f"synthetic uplink exited during capture rc={inject_status}; stopping RCT capture",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return stop_capture(capture_process), inject_status
|
||||
time.sleep(0.25)
|
||||
|
||||
capture: subprocess.Popen[Any] | None = None
|
||||
diagnosis: list[str] = []
|
||||
if args.capture_before_inject:
|
||||
capture = start_capture()
|
||||
time.sleep(1.0)
|
||||
inject = start_inject()
|
||||
capture_rc, inject_rc = wait_capture_or_inject_exit(capture, inject)
|
||||
else:
|
||||
inject = start_inject()
|
||||
time.sleep(max(0.0, args.inject_warmup_s))
|
||||
inject_rc = inject.poll()
|
||||
if inject_rc is not None:
|
||||
capture_rc = None
|
||||
diagnosis.append(
|
||||
"synthetic uplink exited before capture warmup completed; disconnect the live client or pause upstream webcam before running the isolated probe"
|
||||
)
|
||||
print(f"synthetic uplink exited before capture started rc={inject_rc}", file=sys.stderr)
|
||||
else:
|
||||
capture = start_capture()
|
||||
capture_rc, inject_rc = wait_capture_or_inject_exit(capture, inject)
|
||||
local_capture = artifact_dir / "capture"
|
||||
local_inject = artifact_dir / "inject"
|
||||
subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False)
|
||||
if capture is not None:
|
||||
subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False)
|
||||
subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False)
|
||||
capture_summary = local_capture / "summary.json"
|
||||
if capture_summary.exists():
|
||||
try:
|
||||
capture_data = json.loads(capture_summary.read_text())
|
||||
decoded_pct = float(capture_data.get("decoded_pct") or 0.0)
|
||||
if inject_rc != 0 and decoded_pct < 80.0:
|
||||
diagnosis.append(
|
||||
"captured frames did not consistently contain synthetic markers and the injector failed; the RCT capture likely measured a mixed, previous, or live webcam stream"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
summary = {
|
||||
"schema": "lesavka.synthetic-rct-probe.orchestrator.v1",
|
||||
"mode": args.mode,
|
||||
"capture_rc": capture_rc,
|
||||
"inject_rc": inject_rc,
|
||||
"diagnosis": diagnosis,
|
||||
"artifact_dir": str(artifact_dir),
|
||||
"capture_artifacts": str(local_capture),
|
||||
"inject_artifacts": str(local_inject),
|
||||
|
||||
@ -149,6 +149,14 @@ fn voice_appsrc_max_time_ns() -> u64 {
|
||||
positive_voice_appsrc_limit_env("LESAVKA_UAC_APP_MAX_TIME_NS", 200_000_000)
|
||||
}
|
||||
|
||||
fn voice_appsrc_leaky_type() -> String {
|
||||
std::env::var("LESAVKA_UAC_APP_LEAKY_TYPE")
|
||||
.ok()
|
||||
.map(|value| value.trim().to_ascii_lowercase())
|
||||
.filter(|value| matches!(value.as_str(), "downstream" | "upstream" | "none"))
|
||||
.unwrap_or_else(|| "downstream".to_string())
|
||||
}
|
||||
|
||||
fn positive_voice_appsrc_limit_env(name: &str, default: u64) -> u64 {
|
||||
std::env::var(name)
|
||||
.ok()
|
||||
@ -174,7 +182,7 @@ fn configure_voice_appsrc(appsrc: &gst_app::AppSrc) {
|
||||
appsrc.set_property("max-time", voice_appsrc_max_time_ns());
|
||||
}
|
||||
if appsrc.has_property("leaky-type", None) {
|
||||
appsrc.set_property_from_str("leaky-type", "none");
|
||||
appsrc.set_property_from_str("leaky-type", &voice_appsrc_leaky_type());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
use crate::camera::update_camera_config;
|
||||
use super::{voice_sink_buffer_time_us, voice_sink_latency_time_us};
|
||||
use super::{default_voice_sink_compensation_us, voice_sink_compensation_us};
|
||||
use super::{voice_appsrc_max_buffers, voice_appsrc_max_bytes, voice_appsrc_max_time_ns};
|
||||
use super::{
|
||||
voice_appsrc_leaky_type, voice_appsrc_max_buffers, voice_appsrc_max_bytes,
|
||||
voice_appsrc_max_time_ns,
|
||||
};
|
||||
|
||||
#[test]
|
||||
/// Keeps `voice_sink_timing_defaults_stay_live_call_friendly` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
|
||||
@ -61,6 +64,25 @@ use crate::camera::update_camera_config;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn voice_appsrc_defaults_to_leaky_live_audio_queue() {
|
||||
temp_env::with_var_unset("LESAVKA_UAC_APP_LEAKY_TYPE", || {
|
||||
assert_eq!(voice_appsrc_leaky_type(), "downstream");
|
||||
});
|
||||
|
||||
temp_env::with_var("LESAVKA_UAC_APP_LEAKY_TYPE", Some("upstream"), || {
|
||||
assert_eq!(voice_appsrc_leaky_type(), "upstream");
|
||||
});
|
||||
|
||||
temp_env::with_var("LESAVKA_UAC_APP_LEAKY_TYPE", Some("none"), || {
|
||||
assert_eq!(voice_appsrc_leaky_type(), "none");
|
||||
});
|
||||
|
||||
temp_env::with_var("LESAVKA_UAC_APP_LEAKY_TYPE", Some("bogus"), || {
|
||||
assert_eq!(voice_appsrc_leaky_type(), "downstream");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Keeps `voice_sink_timing_env_accepts_positive_overrides_only` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
|
||||
/// Inputs are the typed parameters; output is the return value or side effect.
|
||||
|
||||
@ -254,7 +254,20 @@ async fn main() -> Result<()> {
|
||||
let pts_us = sequence.saturating_mul(args.frame_step_us());
|
||||
let encoded = encoder.encode(sequence)?;
|
||||
let bundle = synthetic_bundle(&args, sequence, pts_us, encoded);
|
||||
tx.send(bundle).await.context("sending synthetic bundle")?;
|
||||
if tx.send(bundle).await.is_err() {
|
||||
let response_result = response_task
|
||||
.await
|
||||
.context("joining StreamWebcamMedia task after early close")?;
|
||||
match response_result {
|
||||
Ok(()) => bail!(
|
||||
"StreamWebcamMedia closed before accepting synthetic frame {sequence}; disconnect or pause any live Lesavka client upstream before running the isolated RCT probe"
|
||||
),
|
||||
Err(err) => {
|
||||
return Err(err)
|
||||
.context("StreamWebcamMedia closed before accepting synthetic frame");
|
||||
}
|
||||
}
|
||||
}
|
||||
if args.print_every > 0 && sequence > 0 && sequence % args.print_every == 0 {
|
||||
eprintln!("sent synthetic frame {sequence}/{total_frames}");
|
||||
}
|
||||
|
||||
@ -148,12 +148,18 @@ mod server_upstream_media_audio_normal_mode {
|
||||
env!("CARGO_MANIFEST_DIR"),
|
||||
"/server/src/upstream_media_runtime.rs"
|
||||
));
|
||||
const CALIBRATION_OFFSETS: &str = include_str!(concat!(
|
||||
env!("CARGO_MANIFEST_DIR"),
|
||||
"/server/src/calibration/profile_offsets.rs"
|
||||
));
|
||||
|
||||
#[test]
|
||||
fn microphone_rpc_stays_wired_to_voice_sink_runtime() {
|
||||
assert!(RELAY_TRAIT.contains("stream_microphone"));
|
||||
assert!(MICROPHONE_RPC.contains("open_voice_with_retry"));
|
||||
assert!(AUDIO_SINK.contains("LESAVKA_UAC_BUFFER_TIME_US"));
|
||||
assert!(RUNTIME.contains("LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US"));
|
||||
assert!(AUDIO_SINK.contains("LESAVKA_UAC_APP_LEAKY_TYPE"));
|
||||
assert!(RUNTIME.contains("configured_playout_offset_us"));
|
||||
assert!(CALIBRATION_OFFSETS.contains("LESAVKA_UPSTREAM_{media}_PLAYOUT_OFFSET_US"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,6 +51,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
|
||||
"LESAVKA_UVC_FRAME_META_LOG_PATH=%s",
|
||||
"LESAVKA_UVC_CONTROL_READ_ONLY=",
|
||||
"LESAVKA_UAC_HOST_MIXER_CONTROLS=%s",
|
||||
"LESAVKA_UAC_APP_LEAKY_TYPE=%s",
|
||||
"LESAVKA_REQUIRE_TLS=%s",
|
||||
"LESAVKA_TLS_CERT=%s",
|
||||
"LESAVKA_TLS_KEY=%s",
|
||||
@ -94,6 +95,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
|
||||
assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}"));
|
||||
assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}"));
|
||||
assert!(SERVER_INSTALL.contains("${LESAVKA_UAC_HOST_MIXER_CONTROLS:-0}"));
|
||||
assert!(SERVER_INSTALL.contains("${LESAVKA_UAC_APP_LEAKY_TYPE:-downstream}"));
|
||||
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}"));
|
||||
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}"));
|
||||
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}"));
|
||||
|
||||
@ -36,6 +36,8 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
|
||||
"--inject-host",
|
||||
"--rct-host",
|
||||
"--capture-only",
|
||||
"--capture-before-inject",
|
||||
"--inject-warmup-s",
|
||||
"--source",
|
||||
"--mode",
|
||||
"1280x720@20,1280x720@30,1920x1080@20,1920x1080@30",
|
||||
@ -49,6 +51,8 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
|
||||
"expected_",
|
||||
"suspicious_",
|
||||
"decoded_pct",
|
||||
"diagnosis",
|
||||
"synthetic uplink exited before capture warmup completed",
|
||||
"max_lower_mae",
|
||||
"ffmpeg",
|
||||
"v4l2",
|
||||
@ -76,6 +80,7 @@ fn synthetic_injector_enters_the_public_bundled_media_rpc() {
|
||||
"jpegenc",
|
||||
"client_capture_pts_us: pts_us",
|
||||
"client_send_pts_us: pts_us",
|
||||
"StreamWebcamMedia closed before accepting synthetic frame",
|
||||
] {
|
||||
assert!(
|
||||
SERVER_CARGO.contains(expected) || INJECTOR_SRC.contains(expected),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user