lesavka/client/src/input/camera/capture_pipeline.rs

594 lines
24 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

impl CameraCapture {
pub fn new(device_fragment: Option<&str>, cfg: Option<CameraConfig>) -> anyhow::Result<Self> {
Self::new_with_capture_profile(device_fragment, cfg, None)
}
/// Keeps `new_with_capture_profile` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Inputs are the typed parameters; output is the return value or side effect.
pub fn new_with_capture_profile(
device_fragment: Option<&str>,
cfg: Option<CameraConfig>,
capture_profile_override: Option<(u32, u32, u32)>,
) -> anyhow::Result<Self> {
gst::init().ok();
// Select source: V4L2 device or test pattern
let (src_desc, dev_label, allow_mjpg_source) = match device_fragment {
Some(fragment)
if fragment.eq_ignore_ascii_case("test")
|| fragment.eq_ignore_ascii_case("videotestsrc") =>
{
let pattern =
std::env::var("LESAVKA_CAM_TEST_PATTERN").unwrap_or_else(|_| "smpte".into());
(
format!("videotestsrc is-live=true pattern={pattern}"),
format!("videotestsrc:{pattern}"),
false,
)
}
Some(path) if path.starts_with("/dev/") => (
format!("v4l2src device={path} do-timestamp=true"),
path.to_string(),
true,
),
Some(fragment) => {
let dev = Self::find_device(fragment)
.with_context(|| format!("requested camera '{fragment}' was not found"))?;
(format!("v4l2src device={dev} do-timestamp=true"), dev, true)
}
None => {
let dev = "/dev/video0".to_string();
(format!("v4l2src device={dev} do-timestamp=true"), dev, true)
}
};
let output_codec = cfg.map_or_else(
|| {
match std::env::var("LESAVKA_CAM_CODEC")
.ok()
.map(|value| value.trim().to_ascii_lowercase())
.as_deref()
{
Some("mjpeg" | "mjpg" | "jpeg") => CameraCodec::Mjpeg,
Some("hevc" | "h265" | "h.265") => CameraCodec::Hevc,
_ => CameraCodec::H264,
}
},
|cfg| cfg.codec,
);
let output_mjpeg = matches!(output_codec, CameraCodec::Mjpeg);
let output_hevc = matches!(output_codec, CameraCodec::Hevc);
let jpeg_quality = env_u32("LESAVKA_CAM_JPEG_QUALITY", 85).clamp(1, 100);
let capture_profile = capture_profile_override.unwrap_or_else(|| resolved_capture_profile(cfg));
let (capture_width, capture_height, capture_fps) = capture_profile;
let (width, height, fps) = resolved_output_profile(cfg, capture_profile);
let keyframe_interval = if output_hevc {
hevc_keyframe_interval(fps)
} else {
env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps)
};
let source_profile = camera_source_profile(allow_mjpg_source);
#[cfg(not(coverage))]
if output_hevc && Self::should_use_ffmpeg_hevc_nvenc() {
return Self::new_ffmpeg_hevc_nvenc(
&dev_label,
source_profile,
capture_width,
capture_height,
capture_fps,
width,
height,
fps,
keyframe_interval,
camera_preview_tap_path().is_some(),
);
}
let use_mjpg_source = source_profile == CameraSourceProfile::Mjpeg;
let passthrough_mjpg_source =
use_mjpg_source && capture_profile == (width, height, fps);
let (enc, kf_prop) = if use_mjpg_source && !output_mjpeg {
if output_hevc {
Self::choose_hevc_encoder()?
} else {
Self::choose_encoder()?
}
} else if output_hevc {
Self::choose_hevc_encoder()?
} else {
Self::choose_encoder()?
};
match source_profile {
CameraSourceProfile::Mjpeg if !output_mjpeg => {
tracing::info!("📸 using MJPG source with transcoded output");
}
CameraSourceProfile::AutoDecode => {
tracing::info!("📸 using auto-decoded webcam source (raw/MJPEG accepted)");
}
_ => {}
}
let enc_opts = Self::encoder_options(enc, kf_prop, keyframe_interval);
if output_mjpeg {
tracing::info!("📸 outputting MJPEG frames for UVC (quality={jpeg_quality})");
} else if output_hevc {
tracing::info!("📸 using HEVC encoder element: {enc}");
} else {
tracing::info!("📸 using encoder element: {enc}");
}
#[cfg(not(coverage))]
let have_nvvidconv = gst::ElementFactory::find("nvvidconv").is_some();
let preenc = match enc {
// ───────────────────────────────────────────────────────────────────
// Jetson (has nvvidconv) Desktop (falls back to videoconvert)
// ───────────────────────────────────────────────────────────────────
#[cfg(not(coverage))]
"nvh264enc" | "nvh265enc" if have_nvvidconv =>
format!(
"nvvidconv ! video/x-raw(memory:NVMM),format=NV12,width={width},height={height},framerate={fps}/1 !"
),
#[cfg(not(coverage))]
"nvh264enc" | "nvh265enc" /* else */ =>
format!(
"videoconvert ! video/x-raw,format=NV12,width={width},height={height},framerate={fps}/1 !"
),
#[cfg(not(coverage))]
"x265enc" =>
format!(
"videoconvert ! video/x-raw,format=I420,width={width},height={height},framerate={fps}/1 !"
),
#[cfg(not(coverage))]
"vulkanh264enc" =>
format!(
"videoconvert ! video/x-raw,format=NV12,width={width},height={height},framerate={fps}/1 ! \
vulkanupload ! video/x-raw(memory:VulkanImage),format=NV12,width={width},height={height},framerate={fps}/1 !"
),
#[cfg(not(coverage))]
"vaapih264enc" | "vah265enc" | "vaapih265enc" | "v4l2h265enc" =>
format!(
"videoconvert ! video/x-raw,format=NV12,width={width},height={height},framerate={fps}/1 !"
),
_ =>
format!(
"videoconvert ! video/x-raw,width={width},height={height},framerate={fps}/1 !"
),
};
// let desc = format!(
// "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \
// videoconvert ! {enc} key-int-max=30 ! \
// h264parse config-interval=-1 ! \
// appsink name=asink emit-signals=true max-buffers=60 drop=true"
// );
// tracing::debug!(%desc, "📸 pipeline-desc");
// Build a pipeline that works for any of the three encoders.
// * NVIDIA encoders prefer NV12, using NVMM when Jetson's converter is present.
// * Vulkan/VAAPI/V4L2 hardware encoders also get explicit NV12 caps.
// * x264enc/x265enc keep their software-friendly raw caps.
let preview_tap_path = camera_preview_tap_path();
let preview_tap_branch = camera_preview_tap_branch(width, height, fps);
let source_raw_caps = format!(
"video/x-raw,width={capture_width},height={capture_height},framerate={capture_fps}/1"
);
let raw_source_chain = camera_raw_source_chain(
&src_desc,
&source_raw_caps,
capture_width,
capture_height,
capture_fps,
source_profile,
);
let normalized_raw_chain = format!(
"{raw_source_chain} ! {}",
camera_output_raw_chain(width, height, fps)
);
let encoded_parse_chain = if output_hevc {
"h265parse config-interval=-1 ! video/x-h265,stream-format=byte-stream,alignment=au"
} else {
"h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au"
};
let desc = if preview_tap_path.is_some() {
if output_mjpeg {
if passthrough_mjpg_source {
format!(
"{src_desc} ! \
image/jpeg,width={width},height={height},framerate={fps}/1 ! \
tee name=t \
t. ! queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true \
t. ! queue max-size-buffers=2 leaky=downstream ! jpegdec ! \
{preview_tap_branch}"
)
} else {
format!(
"{normalized_raw_chain} ! \
tee name=t \
t. ! queue max-size-buffers=30 leaky=downstream ! \
videoconvert ! jpegenc quality={jpeg_quality} ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true \
t. ! queue max-size-buffers=2 leaky=downstream ! \
{preview_tap_branch}"
)
}
} else {
format!(
"{normalized_raw_chain} ! \
tee name=t \
t. ! queue max-size-buffers=30 leaky=downstream ! \
{preenc} {enc_opts} ! \
{encoded_parse_chain} ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true \
t. ! queue max-size-buffers=2 leaky=downstream ! \
{preview_tap_branch}"
)
}
} else if output_mjpeg {
if passthrough_mjpg_source {
format!(
"{src_desc} ! \
image/jpeg,width={width},height={height},framerate={fps}/1 ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
)
} else {
format!(
"{normalized_raw_chain} ! \
videoconvert ! jpegenc quality={jpeg_quality} ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
)
}
} else {
format!(
"{normalized_raw_chain} ! \
{preenc} {enc_opts} ! \
{encoded_parse_chain} ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
)
};
tracing::info!(
%enc,
capture_width,
capture_height,
capture_fps,
output_width = width,
output_height = height,
output_fps = fps,
?desc,
"📸 using encoder element"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)
.context("gst parse_launch(cam)")?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
tracing::debug!("📸 pipeline built OK setting PLAYING…");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.expect("appsink element not found")
.downcast::<gst_app::AppSink>()
.expect("appsink downcast");
spawn_camera_bus_logger(&pipeline, dev_label.clone());
if let Err(err) = pipeline.set_state(gst::State::Playing) {
let _ = pipeline.set_state(gst::State::Null);
return Err(err.into());
}
tracing::info!("📸 webcam pipeline ▶️ device={dev_label}");
let preview_tap_running = if let Some(path) = preview_tap_path {
let preview_sink = pipeline
.by_name("preview_sink")
.context("missing camera preview tap appsink")?
.downcast::<gst_app::AppSink>()
.expect("camera preview tap appsink");
Some(spawn_camera_preview_tap(preview_sink, path))
} else {
None
};
Ok(Self {
pipeline,
sink,
ffmpeg_child: None,
preview_tap_running,
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
frame_duration_us: (1_000_000u64 / u64::from(fps.max(1))).max(1),
})
}
#[cfg(not(coverage))]
fn new_ffmpeg_hevc_nvenc(
dev_label: &str,
source_profile: CameraSourceProfile,
capture_width: u32,
capture_height: u32,
capture_fps: u32,
width: u32,
height: u32,
fps: u32,
keyframe_interval: u32,
preview_tap_enabled: bool,
) -> anyhow::Result<Self> {
if preview_tap_enabled {
tracing::warn!(
"📸 HEVC NVENC route is active; launcher preview tap is temporarily disabled for this hardware encode path"
);
}
let bitrate_kbit = env_u32("LESAVKA_CAM_HEVC_KBIT", 3000).max(250);
let fps = fps.max(1);
let capture_fps = capture_fps.max(1);
let keyframe_interval = keyframe_interval.max(1);
let mut command = Command::new("ffmpeg");
command
.arg("-hide_banner")
.arg("-loglevel")
.arg(std::env::var("LESAVKA_FFMPEG_LOGLEVEL").unwrap_or_else(|_| "warning".into()))
.arg("-nostdin")
.arg("-fflags")
.arg("nobuffer")
.arg("-flags")
.arg("low_delay")
.arg("-use_wallclock_as_timestamps")
.arg("1");
if dev_label.starts_with("/dev/") {
command
.arg("-f")
.arg("v4l2")
.arg("-framerate")
.arg(capture_fps.to_string())
.arg("-video_size")
.arg(format!("{capture_width}x{capture_height}"));
if source_profile == CameraSourceProfile::Mjpeg {
command.arg("-input_format").arg("mjpeg");
}
command.arg("-i").arg(dev_label);
} else if dev_label.starts_with("videotestsrc:") {
command
.arg("-f")
.arg("lavfi")
.arg("-i")
.arg(format!(
"testsrc2=size={capture_width}x{capture_height}:rate={capture_fps}"
));
} else {
anyhow::bail!("FFmpeg HEVC NVENC route does not understand camera source '{dev_label}'");
}
let video_filter =
format!("scale={width}:{height}:flags=fast_bilinear,fps={fps},format=nv12");
let bitrate = format!("{bitrate_kbit}k");
command
.arg("-an")
.arg("-sn")
.arg("-dn")
.arg("-vf")
.arg(video_filter)
.arg("-c:v")
.arg("hevc_nvenc")
.arg("-preset")
.arg("p1")
.arg("-tune")
.arg("ll")
.arg("-rc")
.arg("cbr")
.arg("-b:v")
.arg(&bitrate)
.arg("-maxrate")
.arg(&bitrate)
.arg("-bufsize")
.arg(&bitrate)
.arg("-g")
.arg(keyframe_interval.to_string())
.arg("-bf")
.arg("0")
.arg("-forced-idr")
.arg("1")
.arg("-f")
.arg("hevc")
.arg("pipe:1")
.stdout(Stdio::piped())
.stderr(Stdio::null());
tracing::info!(
device = dev_label,
capture_width,
capture_height,
capture_fps,
output_width = width,
output_height = height,
output_fps = fps,
bitrate_kbit,
keyframe_interval,
"📸 using FFmpeg hevc_nvenc hardware encoder"
);
let mut child = command.spawn().context("starting FFmpeg hevc_nvenc camera encoder")?;
let stdout = child
.stdout
.take()
.context("FFmpeg hevc_nvenc stdout was not piped")?;
let fd = stdout.into_raw_fd();
let desc = format!(
"fdsrc fd={fd} blocksize=1048576 ! \
h265parse config-interval=-1 ! \
video/x-h265,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
);
let pipeline: gst::Pipeline = match gst::parse::launch(&desc) {
Ok(element) => element.downcast::<gst::Pipeline>().expect("not a pipeline"),
Err(err) => {
let _ = child.kill();
return Err(err).context("gst parse_launch(ffmpeg hevc nvenc)");
}
};
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.context("appsink element not found for FFmpeg HEVC route")?
.downcast::<gst_app::AppSink>()
.expect("appsink down-cast");
spawn_camera_bus_logger(&pipeline, format!("{dev_label} via ffmpeg hevc_nvenc"));
if let Err(err) = pipeline.set_state(gst::State::Playing) {
let _ = pipeline.set_state(gst::State::Null);
let _ = child.kill();
return Err(err).context("starting FFmpeg HEVC GStreamer handoff pipeline");
}
Ok(Self {
pipeline,
sink,
ffmpeg_child: Some(child),
preview_tap_running: None,
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
frame_duration_us: (1_000_000u64 / u64::from(fps.max(1))).max(1),
})
}
/// Keeps `pull` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Inputs are the typed parameters; output is the return value or side effect.
pub fn pull(&self) -> Option<VideoPacket> {
let sample = self.sink.pull_sample().ok()?;
let buf = sample.buffer()?;
let map = buf.map_readable().ok()?;
let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000);
let packet_duration_us = buf
.duration()
.map(|ts| (ts.nseconds() / 1_000).max(1))
.unwrap_or(self.frame_duration_us);
let timing = self.pts_rebaser.rebase_with_packet_duration(
source_pts_us,
packet_duration_us,
crate::live_capture_clock::upstream_source_lag_cap(),
);
if timing.lag_clamped {
log_camera_stale_source_drop(timing, map.as_slice().len());
return None;
}
let pts = timing.packet_pts_us;
static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let packet_index = CAMERA_PACKET_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
log_camera_first_packet(packet_index, map.as_slice().len(), pts);
log_camera_timing_sample(packet_index, timing, map.as_slice().len());
Some(VideoPacket {
id: 2,
pts,
data: map.as_slice().to_vec(),
..Default::default()
})
}
}
/// Resolve the profile requested from the local webcam.
///
/// The server UVC contract is applied after capture. Keeping these separate
/// prevents a browser-facing 640x480/20 gadget mode from forcing a local webcam
/// to expose that exact mode when the selected camera quality is 720p/30.
fn resolved_capture_profile(cfg: Option<CameraConfig>) -> (u32, u32, u32) {
(
env_u32("LESAVKA_CAM_WIDTH", cfg.map_or(1280, |cfg| cfg.width)),
env_u32("LESAVKA_CAM_HEIGHT", cfg.map_or(720, |cfg| cfg.height)),
env_u32("LESAVKA_CAM_FPS", cfg.map_or(25, |cfg| cfg.fps)).max(1),
)
}
/// Resolve the profile emitted toward the remote UVC gadget.
fn resolved_output_profile(
cfg: Option<CameraConfig>,
capture_profile: (u32, u32, u32),
) -> (u32, u32, u32) {
match cfg {
Some(cfg)
if env_flag_enabled("LESAVKA_CAM_LOCK_TO_SERVER_PROFILE")
|| !env_flag_enabled("LESAVKA_CAM_EMIT_UI_PROFILE") =>
{
(cfg.width, cfg.height, cfg.fps.max(1))
}
_ => capture_profile,
}
}
fn env_flag_enabled(name: &str) -> bool {
std::env::var(name).ok().is_some_and(|value| {
let trimmed = value.trim();
!(trimmed.is_empty()
|| trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
}
/// Choose the live HEVC keyframe cadence.
///
/// Inputs: target FPS plus optional `LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL`
/// override. Output: GOP length in frames. Why: the upstream relay is
/// freshness-first and may intentionally discard video; defaulting HEVC to
/// all-intra is less compression-efficient, but it turns packet loss into a
/// freeze/stutter instead of browser-visible block corruption.
fn hevc_keyframe_interval(fps: u32) -> u32 {
let fps = fps.max(1);
env_u32("LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL", 1).clamp(1, fps)
}
/// Keeps `log_camera_first_packet` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Inputs are the typed parameters; output is the return value or side effect.
fn log_camera_first_packet(packet_index: u64, bytes: usize, pts_us: u64) {
if packet_index == 0 {
tracing::info!(bytes, pts_us, "📸 upstream webcam frames flowing");
}
}
fn should_log_camera_timing_sample(packet_index: u64) -> bool {
crate::live_capture_clock::upstream_timing_trace_enabled()
&& (packet_index < 10 || packet_index.is_multiple_of(300))
}
/// Keeps `log_camera_timing_sample` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Inputs are the typed parameters; output is the return value or side effect.
fn log_camera_timing_sample(
packet_index: u64,
timing: crate::live_capture_clock::RebasedSourcePts,
bytes: usize,
) {
if should_log_camera_timing_sample(packet_index) {
tracing::info!(
packet_index,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
source_base_us = timing.source_base_us.unwrap_or_default(),
capture_base_us = timing.capture_base_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128,
used_source_pts = timing.used_source_pts,
lag_clamped = timing.lag_clamped,
lead_clamped = timing.lead_clamped,
bytes,
"📸 upstream webcam timing sample"
);
}
}
/// Keeps `log_camera_stale_source_drop` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Inputs are the typed parameters; output is the return value or side effect.
fn log_camera_stale_source_drop(timing: crate::live_capture_clock::RebasedSourcePts, bytes: usize) {
static CAMERA_STALE_SOURCE_DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let drop_index =
CAMERA_STALE_SOURCE_DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if drop_index < 10 || drop_index.is_multiple_of(300) {
tracing::warn!(
drop_index,
bytes,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
"📸 dropping stale webcam source buffer before bundled uplink"
);
}
}