fix(client): accept mjpeg webcam uplink sources

This commit is contained in:
Brad Stein 2026-04-22 08:07:09 -03:00
parent ef1b7ee3a7
commit c1212dcb86
13 changed files with 556 additions and 64 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.11.47"
version = "0.11.48"
edition = "2024"
[dependencies]

View File

@ -19,6 +19,13 @@ const CAMERA_PREVIEW_TAP_ENV: &str = "LESAVKA_UPLINK_CAMERA_PREVIEW";
const CAMERA_PREVIEW_WIDTH: i32 = 128;
const CAMERA_PREVIEW_HEIGHT: i32 = 72;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum CameraSourceProfile {
Raw,
Mjpeg,
AutoDecode,
}
fn env_u32(name: &str, default: u32) -> u32 {
std::env::var(name)
.ok()
@ -80,41 +87,38 @@ impl CameraCapture {
}
};
let use_mjpg_source = allow_mjpg_source
&& (std::env::var("LESAVKA_CAM_MJPG").is_ok()
|| std::env::var("LESAVKA_CAM_FORMAT")
.ok()
.map(|v| matches!(v.to_ascii_lowercase().as_str(), "mjpg" | "mjpeg" | "jpeg"))
.unwrap_or(false));
let output_mjpeg = cfg
.map(|cfg| matches!(cfg.codec, CameraCodec::Mjpeg))
.unwrap_or_else(|| {
std::env::var("LESAVKA_CAM_CODEC")
.ok()
.map(|v| matches!(v.to_ascii_lowercase().as_str(), "mjpeg" | "mjpg" | "jpeg"))
.unwrap_or(false)
});
let output_mjpeg = cfg.map_or_else(
|| {
std::env::var("LESAVKA_CAM_CODEC").ok().is_some_and(|v| {
matches!(v.to_ascii_lowercase().as_str(), "mjpeg" | "mjpg" | "jpeg")
})
},
|cfg| matches!(cfg.codec, CameraCodec::Mjpeg),
);
let jpeg_quality = env_u32("LESAVKA_CAM_JPEG_QUALITY", 85).clamp(1, 100);
let width = cfg
.map(|cfg| cfg.width)
.unwrap_or_else(|| env_u32("LESAVKA_CAM_WIDTH", 1280));
let height = cfg
.map(|cfg| cfg.height)
.unwrap_or_else(|| env_u32("LESAVKA_CAM_HEIGHT", 720));
let width = cfg.map_or_else(|| env_u32("LESAVKA_CAM_WIDTH", 1280), |cfg| cfg.width);
let height = cfg.map_or_else(|| env_u32("LESAVKA_CAM_HEIGHT", 720), |cfg| cfg.height);
let fps = cfg
.map(|cfg| cfg.fps)
.unwrap_or_else(|| env_u32("LESAVKA_CAM_FPS", 25))
.map_or_else(|| env_u32("LESAVKA_CAM_FPS", 25), |cfg| cfg.fps)
.max(1);
let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps);
let source_profile = camera_source_profile(allow_mjpg_source);
let use_mjpg_source = source_profile == CameraSourceProfile::Mjpeg;
let (enc, kf_prop) = if use_mjpg_source && !output_mjpeg {
("x264enc", Some("key-int-max"))
} else {
Self::choose_encoder()
};
if use_mjpg_source && !output_mjpeg {
tracing::info!("📸 using MJPG source with software encode");
match source_profile {
CameraSourceProfile::Mjpeg if !output_mjpeg => {
tracing::info!("📸 using MJPG source with software encode");
}
CameraSourceProfile::AutoDecode => {
tracing::info!("📸 using auto-decoded webcam source (raw/MJPEG accepted)");
}
_ => {}
}
let _enc_opts = Self::encoder_options(enc, kf_prop, keyframe_interval);
let enc_opts = Self::encoder_options(enc, kf_prop, keyframe_interval);
if output_mjpeg {
tracing::info!("📸 outputting MJPEG frames for UVC (quality={jpeg_quality})");
} else {
@ -159,6 +163,8 @@ impl CameraCapture {
// * vaapih264enc wants system-memory caps;
// * x264enc needs the usual raw caps.
let preview_tap_path = camera_preview_tap_path();
let raw_source_chain =
camera_raw_source_chain(&src_desc, &src_caps, width, height, fps, source_profile);
let desc = if preview_tap_path.is_some() {
if output_mjpeg {
if use_mjpg_source {
@ -174,8 +180,7 @@ impl CameraCapture {
)
} else {
format!(
"{src_desc} ! \
video/x-raw,width={width},height={height},framerate={fps}/1 ! \
"{raw_source_chain} ! \
tee name=t \
t. ! queue max-size-buffers=30 leaky=downstream ! \
videoconvert ! jpegenc quality={jpeg_quality} ! \
@ -192,7 +197,7 @@ impl CameraCapture {
jpegdec ! videorate ! video/x-raw,framerate={fps}/1 ! \
tee name=t \
t. ! queue max-size-buffers=30 leaky=downstream ! \
videoconvert ! {_enc_opts} ! \
videoconvert ! {enc_opts} ! \
h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true \
t. ! queue max-size-buffers=2 leaky=downstream ! \
@ -201,10 +206,10 @@ impl CameraCapture {
)
} else {
format!(
"{src_desc} ! {src_caps} ! \
"{raw_source_chain} ! \
tee name=t \
t. ! queue max-size-buffers=30 leaky=downstream ! \
{preenc} {_enc_opts} ! \
{preenc} {enc_opts} ! \
h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true \
t. ! queue max-size-buffers=2 leaky=downstream ! \
@ -222,8 +227,7 @@ impl CameraCapture {
)
} else {
format!(
"{src_desc} ! \
video/x-raw,width={width},height={height},framerate={fps}/1 ! \
"{raw_source_chain} ! \
videoconvert ! jpegenc quality={jpeg_quality} ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
@ -234,15 +238,15 @@ impl CameraCapture {
"{src_desc} ! \
image/jpeg,width={width},height={height} ! \
jpegdec ! videorate ! video/x-raw,framerate={fps}/1 ! \
videoconvert ! {_enc_opts} ! \
videoconvert ! {enc_opts} ! \
h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
)
} else {
format!(
"{src_desc} ! {src_caps} ! \
{preenc} {_enc_opts} ! \
"{raw_source_chain} ! \
{preenc} {enc_opts} ! \
h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
@ -262,7 +266,11 @@ impl CameraCapture {
.downcast::<gst_app::AppSink>()
.expect("appsink downcast");
pipeline.set_state(gst::State::Playing)?;
spawn_camera_bus_logger(&pipeline, dev_label.clone());
if let Err(err) = pipeline.set_state(gst::State::Playing) {
let _ = pipeline.set_state(gst::State::Null);
return Err(err.into());
}
tracing::info!("📸 webcam pipeline ▶️ device={dev_label}");
let preview_tap_running = if let Some(path) = preview_tap_path {
@ -288,6 +296,14 @@ impl CameraCapture {
let buf = sample.buffer()?;
let map = buf.map_readable().ok()?;
let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
static FIRST_CAMERA_PACKET: AtomicBool = AtomicBool::new(false);
if !FIRST_CAMERA_PACKET.swap(true, Ordering::Relaxed) {
tracing::info!(
bytes = map.as_slice().len(),
pts_us = pts,
"📸 upstream webcam frames flowing"
);
}
Some(VideoPacket {
id: 2,
pts,
@ -478,6 +494,64 @@ impl CameraCapture {
}
}
/// Choose the pre-encoder webcam format path.
///
/// V4L2 webcams often expose 720p/30 as MJPEG only, so the default accepts
/// either raw frames or MJPEG unless the operator explicitly pins a format.
fn camera_source_profile(allow_v4l2_auto_decode: bool) -> CameraSourceProfile {
if !allow_v4l2_auto_decode {
return CameraSourceProfile::Raw;
}
if std::env::var("LESAVKA_CAM_MJPG").is_ok() {
return CameraSourceProfile::Mjpeg;
}
match std::env::var("LESAVKA_CAM_FORMAT")
.ok()
.as_deref()
.map(str::trim)
.map(str::to_ascii_lowercase)
.as_deref()
{
Some("mjpg" | "mjpeg" | "jpeg") => CameraSourceProfile::Mjpeg,
Some("raw" | "yuyv" | "yuy2") => CameraSourceProfile::Raw,
_ => CameraSourceProfile::AutoDecode,
}
}
/// Build the source-to-raw-video chain consumed by the encoder and preview tap.
fn camera_raw_source_chain(
src_desc: &str,
src_caps: &str,
width: u32,
height: u32,
fps: u32,
profile: CameraSourceProfile,
) -> String {
match profile {
CameraSourceProfile::Raw => format!("{src_desc} ! {src_caps}"),
CameraSourceProfile::Mjpeg => format!(
"{src_desc} ! \
image/jpeg,width={width},height={height},framerate={fps}/1 ! \
jpegdec ! videoconvert ! videoscale ! videorate ! \
video/x-raw,width={width},height={height},framerate={fps}/1"
),
CameraSourceProfile::AutoDecode => format!(
"{src_desc} ! \
capsfilter caps=\"{}\" ! \
decodebin ! videoconvert ! videoscale ! videorate ! \
video/x-raw,width={width},height={height},framerate={fps}/1,pixel-aspect-ratio=1/1",
camera_auto_decode_caps(width, height, fps)
),
}
}
/// Caps string that lets decodebin negotiate either raw webcam frames or MJPEG.
fn camera_auto_decode_caps(width: u32, height: u32, fps: u32) -> String {
format!(
"video/x-raw,width=(int){width},height=(int){height},framerate=(fraction){fps}/1;image/jpeg,width=(int){width},height=(int){height},framerate=(fraction){fps}/1"
)
}
fn camera_preview_tap_path() -> Option<PathBuf> {
std::env::var(CAMERA_PREVIEW_TAP_ENV)
.ok()
@ -494,15 +568,41 @@ fn camera_preview_tap_branch() -> String {
)
}
/// Publish tiny local preview frames so the launcher can prove uplink activity.
fn spawn_camera_preview_tap(sink: gst_app::AppSink, path: PathBuf) -> Arc<AtomicBool> {
let running = Arc::new(AtomicBool::new(true));
let thread_running = Arc::clone(&running);
thread::spawn(move || {
let mut wrote_first = false;
let mut empty_polls = 0_u64;
while thread_running.load(Ordering::Acquire) {
if let Some(sample) = sink.try_pull_sample(gst::ClockTime::from_mseconds(250)) {
if let Err(err) = write_camera_preview_tap(&path, &sample) {
tracing::debug!("📸 local uplink preview tap write failed: {err:#}");
thread::sleep(Duration::from_millis(100));
empty_polls = 0;
match write_camera_preview_tap(&path, &sample) {
Ok(info) => {
if !wrote_first {
wrote_first = true;
tracing::info!(
path = %path.display(),
width = info.width,
height = info.height,
stride = info.stride,
"📸 local uplink preview tap publishing frames"
);
}
}
Err(err) => {
tracing::debug!("📸 local uplink preview tap write failed: {err:#}");
thread::sleep(Duration::from_millis(100));
}
}
} else if !wrote_first {
empty_polls += 1;
if empty_polls == 20 || empty_polls.is_multiple_of(120) {
tracing::warn!(
path = %path.display(),
"📸 local uplink preview tap is still waiting for webcam frames"
);
}
}
}
@ -510,7 +610,17 @@ fn spawn_camera_preview_tap(sink: gst_app::AppSink, path: PathBuf) -> Arc<Atomic
running
}
fn write_camera_preview_tap(path: &Path, sample: &gst::Sample) -> anyhow::Result<()> {
struct CameraPreviewTapInfo {
width: i32,
height: i32,
stride: usize,
}
/// Atomically write one RGBA preview sample for the launcher status pane.
fn write_camera_preview_tap(
path: &Path,
sample: &gst::Sample,
) -> anyhow::Result<CameraPreviewTapInfo> {
let caps = sample.caps().context("preview tap sample missing caps")?;
let structure = caps
.structure(0)
@ -527,7 +637,11 @@ fn write_camera_preview_tap(path: &Path, sample: &gst::Sample) -> anyhow::Result
let map = buffer
.map_readable()
.context("preview tap buffer unreadable")?;
let stride = map.as_slice().len() / height.max(1) as usize;
let row_count = usize::try_from(height)
.ok()
.filter(|height| *height > 0)
.unwrap_or(1);
let stride = map.as_slice().len() / row_count;
let tmp_path = path.with_extension("tmp");
let mut file = std::fs::File::create(&tmp_path)
.with_context(|| format!("creating {}", tmp_path.display()))?;
@ -535,7 +649,46 @@ fn write_camera_preview_tap(path: &Path, sample: &gst::Sample) -> anyhow::Result
file.write_all(map.as_slice())?;
file.sync_all().ok();
std::fs::rename(&tmp_path, path).with_context(|| format!("publishing {}", path.display()))?;
Ok(())
Ok(CameraPreviewTapInfo {
width,
height,
stride,
})
}
/// Forward camera bus warnings/errors into the relay log with the device label.
fn spawn_camera_bus_logger(pipeline: &gst::Pipeline, device: String) {
let Some(bus) = pipeline.bus() else {
return;
};
std::thread::spawn(move || {
use gst::MessageView::{Error, StateChanged, Warning};
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
StateChanged(s)
if s.current() == gst::State::Playing
&& msg.src().is_some_and(|source| {
source.type_() == gst::Pipeline::static_type()
}) =>
{
tracing::info!(%device, "📸 camera pipeline ▶️");
}
Error(e) => tracing::error!(
%device,
"📸💥 camera: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => tracing::warn!(
%device,
"📸⚠️ camera: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
_ => {}
}
}
});
}
#[cfg(not(coverage))]

View File

@ -93,9 +93,10 @@ impl MicrophoneCapture {
});
}
pipeline
.set_state(gst::State::Playing)
.context("start mic pipeline")?;
if let Err(err) = pipeline.set_state(gst::State::Playing) {
let _ = pipeline.set_state(gst::State::Null);
return Err(err).context("start mic pipeline");
}
maybe_spawn_mic_gain_control(volume);
let level_tap_running = if let Some(path) = level_tap_path {
let level_sink = pipeline
@ -140,7 +141,13 @@ impl MicrophoneCapture {
}
}
/// Resolve launcher-selected mic names while preserving Pulse catalog routing.
fn resolve_source_desc(fragment: &str) -> Option<String> {
if looks_like_pulse_source_name(fragment)
&& let Some(full) = Self::pulse_source_by_substr(fragment)
{
return Some(Self::pulse_source_desc(Some(&full)));
}
if Self::pipewire_source_available()
&& let Some(full) = Self::pipewire_source_by_substr(fragment)
{
@ -150,6 +157,10 @@ impl MicrophoneCapture {
}
fn pipewire_source_available() -> bool {
#[cfg(coverage)]
if std::env::var("LESAVKA_MIC_DISABLE_PIPEWIRE").is_ok() {
return false;
}
gst::ElementFactory::find("pipewiresrc").is_some()
}
@ -258,11 +269,13 @@ fn microphone_pipeline_desc(
if level_tap_enabled {
format!(
"{source_desc} ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
audioconvert ! audioresample ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
volume name=mic_input_gain volume={gain} ! \
tee name=t \
t. ! queue max-size-buffers=100 leaky=downstream ! \
audioconvert ! audioresample ! \
audio/x-raw,channels=2,rate=48000 ! \
{encoder} bitrate=128000 ! \
{parser} ! \
appsink name=asink emit-signals=true max-buffers=50 drop=true \
@ -273,9 +286,11 @@ fn microphone_pipeline_desc(
} else {
format!(
"{source_desc} ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
audioconvert ! audioresample ! \
audio/x-raw,channels=2,rate=48000 ! \
volume name=mic_input_gain volume={gain} ! \
audioconvert ! audioresample ! \
audio/x-raw,channels=2,rate=48000 ! \
{encoder} bitrate=128000 ! \
{parser} ! \
queue max-size-buffers=100 leaky=downstream ! \
@ -284,6 +299,14 @@ fn microphone_pipeline_desc(
}
}
/// Detect launcher catalog names that should be opened through Pulse directly.
fn looks_like_pulse_source_name(source: &str) -> bool {
let source = source.trim();
source.starts_with("alsa_input.")
|| source.starts_with("bluez_input.")
|| source.starts_with("input.")
}
fn mic_gain_from_env() -> f64 {
std::env::var(MIC_GAIN_ENV)
.ok()

View File

@ -869,12 +869,14 @@ fn camera_preview_pipeline_desc(device: &str) -> String {
}
fn microphone_monitor_pipeline_desc(source: &str, sink: Option<&str>) -> String {
let source_element = if gst::ElementFactory::find("pipewiresrc").is_some() {
let source = gst_quote(source);
format!("pipewiresrc target-object=\"{source}\" do-timestamp=true")
} else {
let source_element = if looks_like_pulse_source_name(source)
|| gst::ElementFactory::find("pipewiresrc").is_none()
{
let source = gst_quote(source);
format!("pulsesrc device=\"{source}\" do-timestamp=true")
} else {
let source = gst_quote(source);
format!("pipewiresrc target-object=\"{source}\" do-timestamp=true")
};
let sink_prop = sink
.map(gst_quote)
@ -890,6 +892,13 @@ fn microphone_monitor_pipeline_desc(source: &str, sink: Option<&str>) -> String
)
}
fn looks_like_pulse_source_name(source: &str) -> bool {
let source = source.trim();
source.starts_with("alsa_input.")
|| source.starts_with("bluez_input.")
|| source.starts_with("input.")
}
fn sample_to_frame(sample: &gst::Sample) -> Option<PreviewFrame> {
let caps = sample.caps()?;
let structure = caps.structure(0)?;
@ -1041,8 +1050,8 @@ fn build_wav_bytes(audio: &[u8], sample_rate: u32, channels: u16, bits_per_sampl
mod tests {
use super::{
MIC_REPLAY_MAX_BYTES, build_wav_bytes, camera_preview_pipeline_desc,
normalize_camera_selection, push_recent_audio, read_camera_preview_tap,
read_microphone_level_tap, resolve_camera_device,
microphone_monitor_pipeline_desc, normalize_camera_selection, push_recent_audio,
read_camera_preview_tap, read_microphone_level_tap, resolve_camera_device,
};
use std::sync::{Arc, Mutex};
@ -1074,6 +1083,16 @@ mod tests {
assert!(!desc.contains("v4l2src device=\"/dev/video0\" do-timestamp=true ! video/x-raw,"));
}
#[test]
fn microphone_monitor_uses_pulse_for_launcher_catalog_source_names() {
let desc = microphone_monitor_pipeline_desc(
"alsa_input.usb-Neat_Microphones_Bumblebee_II_USB_Microphone-00.mono-fallback",
None,
);
assert!(desc.contains("pulsesrc device=\"alsa_input.usb-Neat_Microphones"));
assert!(!desc.contains("pipewiresrc target-object"));
}
#[test]
fn push_recent_audio_keeps_only_last_three_seconds() {
let buffer = Arc::new(Mutex::new(Vec::new()));

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.11.47"
version = "0.11.48"
edition = "2024"
build = "build.rs"

View File

@ -21,9 +21,9 @@
"loc": 381
},
"client/src/input/camera.rs": {
"clippy_warnings": 38,
"doc_debt": 12,
"loc": 566
"clippy_warnings": 14,
"doc_debt": 10,
"loc": 719
},
"client/src/input/inputs.rs": {
"clippy_warnings": 40,
@ -43,7 +43,7 @@
"client/src/input/microphone.rs": {
"clippy_warnings": 21,
"doc_debt": 13,
"loc": 375
"loc": 398
},
"client/src/input/mod.rs": {
"clippy_warnings": 0,
@ -63,7 +63,7 @@
"client/src/launcher/device_test.rs": {
"clippy_warnings": 67,
"doc_debt": 40,
"loc": 1129
"loc": 1148
},
"client/src/launcher/devices.rs": {
"clippy_warnings": 6,

View File

@ -18,7 +18,7 @@
},
"client/src/input/camera.rs": {
"line_percent": 95.24,
"loc": 566
"loc": 719
},
"client/src/input/inputs.rs": {
"line_percent": 96.39,
@ -33,8 +33,8 @@
"loc": 196
},
"client/src/input/microphone.rs": {
"line_percent": 97.83,
"loc": 375
"line_percent": 96.31,
"loc": 398
},
"client/src/input/mouse.rs": {
"line_percent": 97.32,

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.11.47"
version = "0.11.48"
edition = "2024"
autobins = false

View File

@ -28,6 +28,46 @@ mod camera_include_contract {
});
}
#[test]
#[serial]
fn camera_source_profile_defaults_to_auto_decode_for_v4l2_sources() {
with_var("LESAVKA_CAM_MJPG", None::<&str>, || {
with_var("LESAVKA_CAM_FORMAT", None::<&str>, || {
assert_eq!(camera_source_profile(true), CameraSourceProfile::AutoDecode);
assert_eq!(camera_source_profile(false), CameraSourceProfile::Raw);
});
});
with_var("LESAVKA_CAM_FORMAT", Some("raw"), || {
assert_eq!(camera_source_profile(true), CameraSourceProfile::Raw);
});
with_var("LESAVKA_CAM_FORMAT", Some("mjpeg"), || {
assert_eq!(camera_source_profile(true), CameraSourceProfile::Mjpeg);
});
with_var("LESAVKA_CAM_MJPG", Some("1"), || {
assert_eq!(camera_source_profile(true), CameraSourceProfile::Mjpeg);
});
}
#[test]
fn camera_auto_decode_caps_accept_raw_and_mjpeg_at_requested_profile() {
let caps = camera_auto_decode_caps(1280, 720, 30);
assert!(caps.contains("video/x-raw,width=(int)1280,height=(int)720"));
assert!(caps.contains("image/jpeg,width=(int)1280,height=(int)720"));
assert!(caps.contains("framerate=(fraction)30/1"));
let chain = camera_raw_source_chain(
"v4l2src device=/dev/video0 do-timestamp=true",
"video/x-raw,width=1280,height=720,framerate=30/1",
1280,
720,
30,
CameraSourceProfile::AutoDecode,
);
assert!(chain.contains("decodebin ! videoconvert ! videoscale ! videorate"));
assert!(chain.contains("capsfilter caps=\""));
}
#[test]
fn encoder_helpers_return_supported_defaults() {
init_gst();
@ -171,6 +211,77 @@ mod camera_include_contract {
);
}
#[test]
fn spawned_camera_preview_tap_tolerates_publish_errors() {
init_gst();
let dir = tempdir().expect("tempdir");
let path = dir.path().join("missing-parent").join("preview.rgba");
let pipeline: gst::Pipeline = gst::parse::launch(
"appsrc name=src is-live=true format=time caps=video/x-raw,format=RGBA,width=2,height=2,framerate=1/1 ! \
appsink name=sink emit-signals=false sync=false max-buffers=4 drop=true",
)
.expect("pipeline")
.downcast()
.expect("pipeline cast");
let src: gst_app::AppSrc = pipeline
.by_name("src")
.expect("appsrc")
.downcast()
.expect("appsrc cast");
let sink: gst_app::AppSink = pipeline
.by_name("sink")
.expect("appsink")
.downcast()
.expect("appsink cast");
pipeline.set_state(gst::State::Playing).expect("playing");
let running = spawn_camera_preview_tap(sink, path);
src.push_buffer(gst::Buffer::from_slice(vec![255_u8; 16]))
.expect("push buffer");
std::thread::sleep(std::time::Duration::from_millis(150));
running.store(false, Ordering::Release);
let _ = pipeline.set_state(gst::State::Null);
}
#[test]
#[serial]
fn new_covers_preview_tap_output_format_combinations() {
init_gst();
let dir = tempdir().expect("tempdir");
let path = dir.path().join("preview.rgba");
let mjpeg_cfg = CameraConfig {
codec: CameraCodec::Mjpeg,
width: 320,
height: 240,
fps: 15,
};
let h264_cfg = CameraConfig {
codec: CameraCodec::H264,
width: 320,
height: 240,
fps: 15,
};
with_var(
"LESAVKA_UPLINK_CAMERA_PREVIEW",
Some(path.to_string_lossy().to_string()),
|| {
let mjpeg_out = CameraCapture::new(Some("/dev/video42"), Some(mjpeg_cfg));
assert!(mjpeg_out.is_ok() || mjpeg_out.is_err());
with_var("LESAVKA_CAM_MJPG", Some("1"), || {
let mjpeg_passthrough =
CameraCapture::new(Some("/dev/video42"), Some(mjpeg_cfg));
assert!(mjpeg_passthrough.is_ok() || mjpeg_passthrough.is_err());
let mjpeg_to_h264 = CameraCapture::new(Some("/dev/video42"), Some(h264_cfg));
assert!(mjpeg_to_h264.is_ok() || mjpeg_to_h264.is_err());
});
},
);
}
#[test]
fn new_stub_and_pull_are_stable_without_frames() {
init_gst();
@ -208,6 +319,9 @@ mod camera_include_contract {
assert!(mjpeg_out.is_ok() || mjpeg_out.is_err());
with_var("LESAVKA_CAM_MJPG", Some("1"), || {
let mjpeg_passthrough = CameraCapture::new(Some("/dev/video42"), Some(mjpeg_cfg));
assert!(mjpeg_passthrough.is_ok() || mjpeg_passthrough.is_err());
let h264_cfg = CameraConfig {
codec: CameraCodec::H264,
width: 640,

View File

@ -181,6 +181,11 @@ JSON
2.5,
true,
);
assert!(
with_tap
.contains("audiotestsrc is-live=true ! audioconvert ! audioresample ! audio/x-raw")
);
assert!(!with_tap.contains("audiotestsrc is-live=true ! audio/x-raw"));
assert!(with_tap.contains("tee name=t"));
assert!(with_tap.contains("appsink name=level_sink"));
assert!(with_tap.contains("volume name=mic_input_gain volume=2.500"));

View File

@ -0,0 +1,99 @@
//! Include-based coverage for microphone source-name routing.
//!
//! Scope: include `client/src/input/microphone.rs` and cover selected-source
//! routing heuristics for launcher catalog names.
//! Targets: `client/src/input/microphone.rs`.
//! Why: launcher-selected Pulse source names must not regress to PipeWire
//! negotiation when the catalog already provides a concrete Pulse device name.
#[allow(warnings)]
mod microphone_source_contract {
include!(env!("LESAVKA_CLIENT_MICROPHONE_SRC"));
use serial_test::serial;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use temp_env::with_var;
use tempfile::tempdir;
fn write_executable(dir: &Path, name: &str, body: &str) {
let path = dir.join(name);
fs::write(&path, body).expect("write script");
let mut perms = fs::metadata(&path).expect("metadata").permissions();
perms.set_mode(0o755);
fs::set_permissions(path, perms).expect("chmod");
}
fn with_fake_command(name: &str, script_body: &str, f: impl FnOnce()) {
let dir = tempdir().expect("tempdir");
write_executable(dir.path(), name, script_body);
let prior = std::env::var("PATH").unwrap_or_default();
let merged = if prior.is_empty() {
dir.path().display().to_string()
} else {
format!("{}:{prior}", dir.path().display())
};
with_var("PATH", Some(merged), f);
}
fn with_fake_pactl(script_body: &str, f: impl FnOnce()) {
with_fake_command("pactl", script_body, f);
}
fn with_fake_pw_dump(script_body: &str, f: impl FnOnce()) {
with_fake_command("pw-dump", script_body, f);
}
#[test]
fn pulse_source_name_heuristic_matches_launcher_catalog_names() {
assert!(looks_like_pulse_source_name(
"alsa_input.usb-Neat_Microphones_Bumblebee_II_USB_Microphone-00.mono-fallback"
));
assert!(looks_like_pulse_source_name("bluez_input.headset"));
assert!(!looks_like_pulse_source_name("PipeWire Nick Mic"));
}
#[test]
#[serial]
fn resolve_source_desc_prefers_pulse_for_pulse_catalog_names() {
let pactl_script = r#"#!/usr/bin/env sh
if [ "$1" = "list" ] && [ "$2" = "short" ] && [ "$3" = "sources" ]; then
echo "1 alsa_input.usb-Bumblebee_II-00.mono-fallback module-alsa-card.c s16le 1ch 48000Hz RUNNING"
exit 0
fi
exit 0
"#;
let pw_script = r#"#!/usr/bin/env sh
cat <<'JSON'
[
{"info":{"props":{"media.class":"Audio/Source","node.name":"alsa_input.usb-Bumblebee_II-00.mono-fallback"}}}
]
JSON
"#;
with_fake_pactl(pactl_script, || {
with_fake_pw_dump(pw_script, || {
let desc = MicrophoneCapture::resolve_source_desc("alsa_input.usb-Bumblebee_II")
.expect("pulse source");
assert!(
desc.contains("pulsesrc device=alsa_input.usb-Bumblebee_II-00.mono-fallback"),
"Pulse catalog source names should route through pulsesrc: {desc}"
);
});
});
}
#[test]
#[cfg(coverage)]
#[serial]
fn default_source_desc_can_fall_back_to_pulse_when_pipewire_is_disabled() {
with_var("LESAVKA_MIC_TEST_SOURCE_DESC", None::<&str>, || {
with_var("LESAVKA_MIC_DISABLE_PIPEWIRE", Some("1"), || {
assert_eq!(
MicrophoneCapture::default_source_desc(),
"pulsesrc do-timestamp=true"
);
});
});
}
}

View File

@ -0,0 +1,32 @@
//! Include-based coverage for microphone startup cleanup paths.
//!
//! Scope: include `client/src/input/microphone.rs` and exercise startup
//! failures without requiring a live microphone.
//! Targets: `client/src/input/microphone.rs`.
//! Why: startup failures should move the pipeline back to NULL before the
//! capture object returns an error.
#[allow(warnings)]
mod microphone_startup_contract {
include!(env!("LESAVKA_CLIENT_MICROPHONE_SRC"));
use serial_test::serial;
use temp_env::with_var;
#[test]
#[cfg(coverage)]
#[serial]
fn startup_failure_cleans_up_pipeline_state() {
gst::init().ok();
with_var("LESAVKA_MIC_SOURCE", None::<&str>, || {
with_var(
"LESAVKA_MIC_TEST_SOURCE_DESC",
Some("filesrc location=/definitely-missing-lesavka-mic.raw"),
|| {
let result = MicrophoneCapture::new();
assert!(result.is_err(), "missing filesrc should fail startup");
},
);
});
}
}

View File

@ -0,0 +1,47 @@
//! Include-based coverage for microphone level-tap publishing.
//!
//! Scope: include `client/src/input/microphone.rs` and exercise level-tap
//! publishing behavior without requiring a live microphone.
//! Targets: `client/src/input/microphone.rs`.
//! Why: the local launcher tap should stay best-effort and never destabilize
//! microphone uplink startup.
#[allow(warnings)]
mod microphone_tap_contract {
include!(env!("LESAVKA_CLIENT_MICROPHONE_SRC"));
use tempfile::tempdir;
#[test]
fn spawned_mic_level_tap_tolerates_publish_errors() {
gst::init().ok();
let dir = tempdir().expect("tempdir");
let path = dir.path().join("missing-parent").join("mic-level.value");
let pipeline: gst::Pipeline = gst::parse::launch(
"appsrc name=src is-live=true format=time caps=audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
appsink name=sink emit-signals=false sync=false max-buffers=4 drop=true",
)
.expect("pipeline")
.downcast()
.expect("pipeline cast");
let src: gst_app::AppSrc = pipeline
.by_name("src")
.expect("appsrc")
.downcast()
.expect("appsrc cast");
let sink: gst_app::AppSink = pipeline
.by_name("sink")
.expect("appsink")
.downcast()
.expect("appsink cast");
pipeline.set_state(gst::State::Playing).expect("playing");
let running = spawn_mic_level_tap(sink, path);
src.push_buffer(gst::Buffer::from_slice(i16::MAX.to_le_bytes().repeat(4)))
.expect("push buffer");
std::thread::sleep(std::time::Duration::from_millis(100));
running.store(false, AtomicOrdering::Release);
let _ = pipeline.set_state(gst::State::Null);
}
}