fix(uvc): stream mjpeg from the control helper

This commit is contained in:
Brad Stein 2026-04-28 21:39:46 -03:00
parent 9b610425e1
commit 0ebb150ebe
7 changed files with 466 additions and 27 deletions

6
Cargo.lock generated
View File

@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.14.45"
version = "0.14.46"
dependencies = [
"anyhow",
"async-stream",
@ -1676,7 +1676,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.14.45"
version = "0.14.46"
dependencies = [
"anyhow",
"base64",
@ -1688,7 +1688,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.14.45"
version = "0.14.46"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.14.45"
version = "0.14.46"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.14.45"
version = "0.14.46"
edition = "2024"
build = "build.rs"

View File

@ -36,7 +36,7 @@ LESAVKA_UVC_WIDTH=${LESAVKA_UVC_WIDTH:-640}
LESAVKA_UVC_HEIGHT=${LESAVKA_UVC_HEIGHT:-480}
LESAVKA_UVC_CODEC=${INSTALL_UVC_CODEC}
LESAVKA_UVC_BLOCKING=${LESAVKA_UVC_BLOCKING:-1}
LESAVKA_UVC_CONTROL_READ_ONLY=${LESAVKA_UVC_CONTROL_READ_ONLY:-1}
LESAVKA_UVC_CONTROL_READ_ONLY=${LESAVKA_UVC_CONTROL_READ_ONLY:-0}
LESAVKA_UVC_MAXBURST=${LESAVKA_UVC_MAXBURST:-0}
EOF
}

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.14.45"
version = "0.14.46"
edition = "2024"
autobins = false

View File

@ -4,7 +4,8 @@ use anyhow::{Context, Result};
use std::env;
use std::fs::{File, OpenOptions};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::PathBuf;
use std::thread;
use std::time::Duration;
@ -41,6 +42,13 @@ const UVC_VS_PROBE_CONTROL: u8 = 0x01;
const UVC_VS_COMMIT_CONTROL: u8 = 0x02;
const UVC_VC_REQUEST_ERROR_CODE_CONTROL: u8 = 0x02;
const V4L2_BUF_TYPE_VIDEO_OUTPUT: u32 = 2;
const V4L2_MEMORY_MMAP: u32 = 1;
const V4L2_FIELD_NONE: u32 = 1;
const V4L2_PIX_FMT_MJPEG: u32 = u32::from_le_bytes(*b"MJPG");
const MAX_MJPEG_FRAME_BYTES: usize = 1024 * 1024;
const EMPTY_MJPEG_FRAME: &[u8] = &[0xff, 0xd8, 0xff, 0xd9];
#[repr(C)]
struct V4l2EventSubscription {
type_: u32,
@ -83,6 +91,89 @@ struct UvcRequestData {
data: [u8; UVC_DATA_SIZE],
}
#[repr(C)]
#[derive(Clone, Copy)]
struct V4l2PixFormat {
width: u32,
height: u32,
pixelformat: u32,
field: u32,
bytesperline: u32,
sizeimage: u32,
colorspace: u32,
priv_: u32,
flags: u32,
ycbcr_enc: u32,
quantization: u32,
xfer_func: u32,
}
#[repr(C)]
union V4l2FormatUnion {
pix: V4l2PixFormat,
raw_data: [u8; 200],
}
#[repr(C)]
struct V4l2Format {
type_: u32,
fmt: V4l2FormatUnion,
}
#[repr(C)]
#[derive(Clone, Copy)]
struct V4l2RequestBuffers {
count: u32,
type_: u32,
memory: u32,
capabilities: u32,
flags: u8,
reserved: [u8; 3],
}
#[repr(C)]
#[derive(Clone, Copy)]
struct V4l2Timecode {
type_: u32,
flags: u32,
frames: u8,
seconds: u8,
minutes: u8,
hours: u8,
userbits: [u8; 4],
}
#[repr(C)]
union V4l2BufferM {
offset: u32,
userptr: libc::c_ulong,
planes: *mut libc::c_void,
fd: i32,
}
#[repr(C)]
union V4l2BufferRequest {
request_fd: i32,
reserved: u32,
}
#[repr(C)]
struct V4l2Buffer {
index: u32,
type_: u32,
bytesused: u32,
flags: u32,
field: u32,
timestamp: libc::timeval,
timecode: V4l2Timecode,
sequence: u32,
memory: u32,
m: V4l2BufferM,
length: u32,
reserved2: u32,
request: V4l2BufferRequest,
}
#[derive(Clone, Copy)]
struct UvcConfig {
width: u32,
@ -133,6 +224,249 @@ struct ConfigfsSnapshot {
maxburst: u32,
}
struct MmapBuffer {
ptr: *mut u8,
len: usize,
}
struct UvcVideoStream {
fd: RawFd,
buffers: Vec<MmapBuffer>,
frame_path: PathBuf,
latest_frame: Vec<u8>,
streaming: bool,
}
impl UvcVideoStream {
fn new(fd: RawFd) -> Self {
Self {
fd,
buffers: Vec::new(),
frame_path: frame_spool_path(),
latest_frame: EMPTY_MJPEG_FRAME.to_vec(),
streaming: false,
}
}
fn start(&mut self, cfg: UvcConfig) -> Result<()> {
self.stop();
self.set_format(cfg)?;
self.request_buffers(4)?;
for index in 0..self.buffers.len() {
self.queue_buffer(index as u32)?;
}
let req = ioctl_write::<libc::c_int>(b'V', 18);
let mut type_ = V4L2_BUF_TYPE_VIDEO_OUTPUT as libc::c_int;
let rc = unsafe { libc::ioctl(self.fd, req, &mut type_) };
if rc < 0 {
return Err(std::io::Error::last_os_error()).context("VIDIOC_STREAMON");
}
self.streaming = true;
eprintln!(
"[lesavka-uvc] video stream started with {} buffers; frame_path={}",
self.buffers.len(),
self.frame_path.display()
);
Ok(())
}
fn stop(&mut self) {
if self.streaming {
let req = ioctl_write::<libc::c_int>(b'V', 19);
let mut type_ = V4L2_BUF_TYPE_VIDEO_OUTPUT as libc::c_int;
let rc = unsafe { libc::ioctl(self.fd, req, &mut type_) };
if rc < 0 {
eprintln!(
"[lesavka-uvc] VIDIOC_STREAMOFF failed: {}",
std::io::Error::last_os_error()
);
}
self.streaming = false;
}
if !self.buffers.is_empty() {
for buffer in self.buffers.drain(..) {
unsafe {
libc::munmap(buffer.ptr.cast(), buffer.len);
}
}
let _ = self.request_buffers(0);
}
}
fn pump(&mut self) -> Result<()> {
if !self.streaming {
return Ok(());
}
loop {
let mut buf = empty_v4l2_buffer();
let req = ioctl_readwrite::<V4l2Buffer>(b'V', 17);
let rc = unsafe { libc::ioctl(self.fd, req, &mut buf) };
if rc < 0 {
let err = std::io::Error::last_os_error();
match err.raw_os_error() {
Some(libc::EAGAIN) | Some(libc::EINTR) => return Ok(()),
_ => return Err(err).context("VIDIOC_DQBUF"),
}
}
self.queue_buffer(buf.index)?;
}
}
fn set_format(&self, cfg: UvcConfig) -> Result<()> {
let mut fmt = V4l2Format {
type_: V4L2_BUF_TYPE_VIDEO_OUTPUT,
fmt: V4l2FormatUnion {
pix: V4l2PixFormat {
width: cfg.width,
height: cfg.height,
pixelformat: V4L2_PIX_FMT_MJPEG,
field: V4L2_FIELD_NONE,
bytesperline: 0,
sizeimage: cfg.frame_size.max(cfg.width * cfg.height),
colorspace: 8,
priv_: 0,
flags: 0,
ycbcr_enc: 0,
quantization: 0,
xfer_func: 0,
},
},
};
let req = ioctl_readwrite::<V4l2Format>(b'V', 5);
let rc = unsafe { libc::ioctl(self.fd, req, &mut fmt) };
if rc < 0 {
return Err(std::io::Error::last_os_error()).context("VIDIOC_S_FMT");
}
Ok(())
}
fn request_buffers(&mut self, count: u32) -> Result<()> {
let mut rb = V4l2RequestBuffers {
count,
type_: V4L2_BUF_TYPE_VIDEO_OUTPUT,
memory: V4L2_MEMORY_MMAP,
capabilities: 0,
flags: 0,
reserved: [0; 3],
};
let req = ioctl_readwrite::<V4l2RequestBuffers>(b'V', 8);
let rc = unsafe { libc::ioctl(self.fd, req, &mut rb) };
if rc < 0 {
return Err(std::io::Error::last_os_error()).context("VIDIOC_REQBUFS");
}
if count == 0 {
return Ok(());
}
self.buffers.clear();
for index in 0..rb.count {
let mut buf = empty_v4l2_buffer();
buf.index = index;
let req = ioctl_readwrite::<V4l2Buffer>(b'V', 9);
let rc = unsafe { libc::ioctl(self.fd, req, &mut buf) };
if rc < 0 {
return Err(std::io::Error::last_os_error()).context("VIDIOC_QUERYBUF");
}
let offset = unsafe { buf.m.offset };
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
buf.length as usize,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
self.fd,
offset as libc::off_t,
)
};
if ptr == libc::MAP_FAILED {
return Err(std::io::Error::last_os_error()).context("mmap UVC buffer");
}
self.buffers.push(MmapBuffer {
ptr: ptr.cast(),
len: buf.length as usize,
});
}
Ok(())
}
fn queue_buffer(&mut self, index: u32) -> Result<()> {
self.refresh_latest_frame();
let Some(buffer) = self.buffers.get(index as usize) else {
return Ok(());
};
let bytes = self.latest_frame.len().min(buffer.len);
if bytes > 0 {
unsafe {
std::ptr::copy_nonoverlapping(self.latest_frame.as_ptr(), buffer.ptr, bytes);
}
} else {
unsafe {
std::ptr::write_bytes(buffer.ptr, 0, buffer.len);
}
}
let mut buf = empty_v4l2_buffer();
buf.index = index;
buf.bytesused = bytes as u32;
buf.length = buffer.len as u32;
let req = ioctl_readwrite::<V4l2Buffer>(b'V', 15);
let rc = unsafe { libc::ioctl(self.fd, req, &mut buf) };
if rc < 0 {
return Err(std::io::Error::last_os_error()).context("VIDIOC_QBUF");
}
Ok(())
}
fn refresh_latest_frame(&mut self) {
if let Ok(frame) = std::fs::read(&self.frame_path) {
if !frame.is_empty() && frame.len() <= MAX_MJPEG_FRAME_BYTES {
self.latest_frame = frame;
}
}
}
}
impl Drop for UvcVideoStream {
fn drop(&mut self) {
self.stop();
}
}
fn empty_v4l2_buffer() -> V4l2Buffer {
V4l2Buffer {
index: 0,
type_: V4L2_BUF_TYPE_VIDEO_OUTPUT,
bytesused: 0,
flags: 0,
field: V4L2_FIELD_NONE,
timestamp: libc::timeval {
tv_sec: 0,
tv_usec: 0,
},
timecode: V4l2Timecode {
type_: 0,
flags: 0,
frames: 0,
seconds: 0,
minutes: 0,
hours: 0,
userbits: [0; 4],
},
sequence: 0,
memory: V4L2_MEMORY_MMAP,
m: V4l2BufferM { offset: 0 },
length: 0,
reserved2: 0,
request: V4l2BufferRequest { reserved: 0 },
}
}
fn frame_spool_path() -> PathBuf {
env::var("LESAVKA_UVC_FRAME_PATH")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/run/lesavka-uvc-frame.mjpg"))
}
fn main() -> Result<()> {
let (dev, cfg) = parse_args()?;
let _singleton = acquire_singleton_lock()?;
@ -144,8 +478,7 @@ fn main() -> Result<()> {
);
let debug = env::var("LESAVKA_UVC_DEBUG").is_ok();
let nonblock = env::var("LESAVKA_UVC_BLOCKING").is_err();
eprintln!("[lesavka-uvc] nonblock={}", if nonblock { 1 } else { 0 });
eprintln!("[lesavka-uvc] nonblock=1");
let mut setup_seen: u64 = 0;
let mut data_seen: u64 = 0;
let mut dq_err_seen: u64 = 0;
@ -190,8 +523,13 @@ fn main() -> Result<()> {
let mut state = UvcState::new(cfg);
let mut pending: Option<PendingRequest> = None;
let mut video = UvcVideoStream::new(fd);
loop {
if let Err(err) = video.pump() {
eprintln!("[lesavka-uvc] video pump failed: {err:#}");
}
let mut ev = unsafe { std::mem::zeroed::<V4l2Event>() };
let rc = unsafe { libc::ioctl(fd, vidioc_dqevent, &mut ev) };
if rc < 0 {
@ -211,6 +549,7 @@ fn main() -> Result<()> {
}
Some(libc::ENODEV) | Some(libc::EBADF) | Some(libc::EIO) => {
eprintln!("[lesavka-uvc] device reset ({err}); reopening");
video.stop();
break;
}
_ => {
@ -226,9 +565,20 @@ fn main() -> Result<()> {
let speed = u32::from_le_bytes(event_bytes(&ev)[0..4].try_into().unwrap());
eprintln!("[lesavka-uvc] UVC connect (speed={speed})");
}
UVC_EVENT_DISCONNECT => eprintln!("[lesavka-uvc] UVC disconnect"),
UVC_EVENT_STREAMON => eprintln!("[lesavka-uvc] stream on"),
UVC_EVENT_STREAMOFF => eprintln!("[lesavka-uvc] stream off"),
UVC_EVENT_DISCONNECT => {
eprintln!("[lesavka-uvc] UVC disconnect");
video.stop();
}
UVC_EVENT_STREAMON => {
eprintln!("[lesavka-uvc] stream on");
if let Err(err) = video.start(state.cfg) {
eprintln!("[lesavka-uvc] stream start failed: {err:#}");
}
}
UVC_EVENT_STREAMOFF => {
eprintln!("[lesavka-uvc] stream off");
video.stop();
}
UVC_EVENT_SETUP => {
let req = parse_ctrl_request(event_bytes(&ev));
setup_seen += 1;
@ -425,7 +775,6 @@ fn read_interface(path: &str) -> Option<u8> {
}
fn open_with_retry(path: &str) -> Result<std::fs::File> {
let nonblock = env::var("LESAVKA_UVC_BLOCKING").is_err();
let read_only = uvc_control_read_only();
for attempt in 1..=200 {
let mut opts = OpenOptions::new();
@ -433,9 +782,7 @@ fn open_with_retry(path: &str) -> Result<std::fs::File> {
if !read_only {
opts.write(true);
}
if nonblock {
opts.custom_flags(libc::O_NONBLOCK);
}
match opts.open(path) {
Ok(f) => {
let mode = if read_only { "ro" } else { "rw" };
@ -461,7 +808,7 @@ fn uvc_control_read_only() -> bool {
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(true)
.unwrap_or(false)
}
fn acquire_singleton_lock() -> Result<File> {
@ -1104,6 +1451,15 @@ fn ioctl_write<T>(type_: u8, nr: u8) -> libc::c_ulong {
ioc(IOC_WRITE, type_, nr, std::mem::size_of::<T>() as u16)
}
fn ioctl_readwrite<T>(type_: u8, nr: u8) -> libc::c_ulong {
ioc(
IOC_READ | IOC_WRITE,
type_,
nr,
std::mem::size_of::<T>() as u16,
)
}
fn ioc(dir: u8, type_: u8, nr: u8, size: u16) -> libc::c_ulong {
let dir = (dir as u32) << IOC_DIRSHIFT;
let ty = (type_ as u32) << IOC_TYPESHIFT;

View File

@ -5,6 +5,7 @@ use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64};
use tracing::warn;
@ -23,6 +24,7 @@ pub struct WebcamSink {
clock_aligned: AtomicBool,
next_pts_us: AtomicU64,
frame_step_us: u64,
mjpeg_spool_path: Option<PathBuf>,
}
fn uvc_sink_session_clock_align_enabled() -> bool {
@ -39,20 +41,39 @@ fn uvc_sink_session_clock_align_enabled() -> bool {
}
fn uvc_mjpeg_v4l2sink_io_mode() -> String {
let value = std::env::var("LESAVKA_UVC_MJPEG_IO_MODE").unwrap_or_else(|_| "rw".to_string());
let value = std::env::var("LESAVKA_UVC_MJPEG_IO_MODE").unwrap_or_else(|_| "mmap".to_string());
let trimmed = value.trim().to_ascii_lowercase();
match trimmed.as_str() {
"auto" | "rw" | "mmap" | "userptr" | "dmabuf" | "dmabuf-import" => trimmed,
_ => {
warn!(
value,
"invalid LESAVKA_UVC_MJPEG_IO_MODE; falling back to rw"
"invalid LESAVKA_UVC_MJPEG_IO_MODE; falling back to mmap"
);
"rw".to_string()
"mmap".to_string()
}
}
}
fn mjpeg_spool_enabled() -> bool {
std::env::var("LESAVKA_UVC_MJPEG_SPOOL")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(true)
}
fn mjpeg_spool_path() -> PathBuf {
std::env::var("LESAVKA_UVC_FRAME_PATH")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/run/lesavka-uvc-frame.mjpg"))
}
impl WebcamSink {
/// Build a new webcam sink pipeline.
///
@ -92,6 +113,7 @@ impl WebcamSink {
clock_aligned: AtomicBool::new(!clock_align_enabled),
next_pts_us: AtomicU64::new(0),
frame_step_us,
mjpeg_spool_path: None,
})
}
@ -123,7 +145,31 @@ impl WebcamSink {
crate::media_timing::prepare_pipeline_clock_sync(&pipeline);
}
if use_mjpeg {
let mut mjpeg_spool_file = None;
if use_mjpeg && mjpeg_spool_enabled() {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
src.set_caps(Some(&caps_mjpeg));
let sink = gst::ElementFactory::make("fakesink")
.build()
.context("building fakesink for MJPEG UVC spool")?;
if clock_align_enabled {
crate::media_timing::enable_sink_clock_sync(&sink);
} else if sink.has_property("sync", None) {
sink.set_property("sync", false);
}
pipeline.add_many([src.upcast_ref(), &sink])?;
gst::Element::link_many([src.upcast_ref(), &sink])?;
mjpeg_spool_file = Some(mjpeg_spool_path());
} else if use_mjpeg {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
@ -141,8 +187,8 @@ impl WebcamSink {
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", uvc_dev)
.build()?;
// The control helper keeps the gadget node open for UVC requests.
// RW mode avoids the MMAP/REQBUFS path that conflicts with that fd.
// Kept as an emergency fallback; normal MJPEG output is brokered
// through the UVC helper so only one process owns the gadget node.
sink.set_property_from_str("io-mode", &uvc_mjpeg_v4l2sink_io_mode());
if clock_align_enabled {
crate::media_timing::enable_sink_clock_sync(&sink);
@ -212,6 +258,7 @@ impl WebcamSink {
clock_aligned: AtomicBool::new(!clock_align_enabled),
next_pts_us: AtomicU64::new(0),
frame_step_us,
mjpeg_spool_path: mjpeg_spool_file,
})
}
@ -229,6 +276,13 @@ impl WebcamSink {
#[cfg(not(coverage))]
pub fn push(&self, pkt: VideoPacket) {
if let Some(path) = &self.mjpeg_spool_path {
if let Err(err) = spool_mjpeg_frame(path, &pkt.data) {
warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper");
}
return;
}
let mut buf = gst::Buffer::from_slice(pkt.data);
if let Some(meta) = buf.get_mut() {
let pts_us = reserve_local_pts(&self.next_pts_us, pkt.pts, self.frame_step_us);
@ -249,6 +303,17 @@ impl WebcamSink {
}
}
#[cfg(not(coverage))]
fn spool_mjpeg_frame(path: &Path, data: &[u8]) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let tmp = path.with_extension(format!("mjpg.{}.tmp", std::process::id()));
fs::write(&tmp, data)?;
fs::rename(&tmp, path)?;
Ok(())
}
impl Drop for WebcamSink {
fn drop(&mut self) {
let _ = self.pipe.set_state(gst::State::Null);
@ -275,9 +340,9 @@ mod tests {
}
#[test]
fn mjpeg_uvc_sink_defaults_to_rw_io_mode_with_safe_override() {
fn mjpeg_uvc_sink_defaults_to_mmap_io_mode_with_safe_override() {
temp_env::with_var_unset("LESAVKA_UVC_MJPEG_IO_MODE", || {
assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "rw");
assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap");
});
temp_env::with_var("LESAVKA_UVC_MJPEG_IO_MODE", Some("mmap"), || {
@ -285,7 +350,25 @@ mod tests {
});
temp_env::with_var("LESAVKA_UVC_MJPEG_IO_MODE", Some("not-real"), || {
assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "rw");
assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap");
});
}
#[test]
fn mjpeg_spool_defaults_on_with_path_override() {
temp_env::with_var_unset("LESAVKA_UVC_MJPEG_SPOOL", || {
assert!(super::mjpeg_spool_enabled());
});
temp_env::with_var("LESAVKA_UVC_MJPEG_SPOOL", Some("0"), || {
assert!(!super::mjpeg_spool_enabled());
});
temp_env::with_var("LESAVKA_UVC_FRAME_PATH", Some("/tmp/frame.mjpg"), || {
assert_eq!(
super::mjpeg_spool_path(),
std::path::PathBuf::from("/tmp/frame.mjpg")
);
});
}
}