From 30eab21166d250ae3a2a8a36d6e0332a5ac23288 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 6 Jan 2026 04:38:41 -0300 Subject: [PATCH] server: add UVC control helper --- scripts/install/server.sh | 3 +- server/src/bin/lesavka-uvc.rs | 527 ++++++++++++++++++++++++++++++++++ server/src/main.rs | 93 +++++- 3 files changed, 618 insertions(+), 5 deletions(-) create mode 100644 server/src/bin/lesavka-uvc.rs diff --git a/scripts/install/server.sh b/scripts/install/server.sh index cb5ae9b..4978b38 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -131,10 +131,11 @@ else fi echo "==> 4b. Source build" -sudo -u "$ORIG_USER" bash -c "cd '$SRC_DIR/server' && cargo clean && cargo build --release" +sudo -u "$ORIG_USER" bash -c "cd '$SRC_DIR/server' && cargo clean && cargo build --release --bins" echo "==> 5. Install binaries" sudo install -Dm755 "$SRC_DIR/server/target/release/lesavka-server" /usr/local/bin/lesavka-server +sudo install -Dm755 "$SRC_DIR/server/target/release/lesavka-uvc" /usr/local/bin/lesavka-uvc sudo install -Dm755 "$SRC_DIR/scripts/daemon/lesavka-core.sh" /usr/local/bin/lesavka-core.sh echo "==> 6a. Systemd units - lesavka-core" diff --git a/server/src/bin/lesavka-uvc.rs b/server/src/bin/lesavka-uvc.rs new file mode 100644 index 0000000..74940b7 --- /dev/null +++ b/server/src/bin/lesavka-uvc.rs @@ -0,0 +1,527 @@ +// lesavka-uvc - minimal UVC control handler for the gadget node. + +use anyhow::{Context, Result}; +use std::env; +use std::fs::OpenOptions; +use std::os::unix::fs::OpenOptionsExt; +use std::os::unix::io::AsRawFd; +use std::thread; +use std::time::Duration; + +const STREAM_CTRL_SIZE: usize = 34; +const UVC_DATA_SIZE: usize = 60; + +const V4L2_EVENT_PRIVATE_START: u32 = 0x0800_0000; +const UVC_EVENT_CONNECT: u32 = V4L2_EVENT_PRIVATE_START + 0; +const UVC_EVENT_DISCONNECT: u32 = V4L2_EVENT_PRIVATE_START + 1; +const UVC_EVENT_STREAMON: u32 = V4L2_EVENT_PRIVATE_START + 2; +const UVC_EVENT_STREAMOFF: u32 = V4L2_EVENT_PRIVATE_START + 3; +const UVC_EVENT_SETUP: u32 = V4L2_EVENT_PRIVATE_START + 4; +const UVC_EVENT_DATA: u32 = V4L2_EVENT_PRIVATE_START + 5; + +const UVC_STRING_CONTROL_IDX: u8 = 0; +const UVC_STRING_STREAMING_IDX: u8 = 1; + +const USB_DIR_IN: u8 = 0x80; + +const UVC_SET_CUR: u8 = 0x01; +const UVC_GET_CUR: u8 = 0x81; +const UVC_GET_MIN: u8 = 0x82; +const UVC_GET_MAX: u8 = 0x83; +const UVC_GET_RES: u8 = 0x84; +const UVC_GET_LEN: u8 = 0x85; +const UVC_GET_INFO: u8 = 0x86; +const UVC_GET_DEF: u8 = 0x87; + +const UVC_VS_PROBE_CONTROL: u8 = 0x01; +const UVC_VS_COMMIT_CONTROL: u8 = 0x02; +const UVC_VC_REQUEST_ERROR_CODE_CONTROL: u8 = 0x02; + +#[repr(C)] +struct V4l2EventSubscription { + type_: u32, + id: u32, + flags: u32, + reserved: [u32; 5], +} + +#[repr(C)] +union V4l2EventUnion { + data: [u8; 64], + _align: u64, +} + +#[repr(C)] +struct V4l2Event { + type_: u32, + u: V4l2EventUnion, + pending: u32, + sequence: u32, + timestamp: libc::timespec, + id: u32, + reserved: [u32; 8], +} + +#[repr(C)] +#[derive(Clone, Copy)] +struct UsbCtrlRequest { + b_request_type: u8, + b_request: u8, + w_value: u16, + w_index: u16, + w_length: u16, +} + +#[repr(C)] +#[derive(Clone, Copy)] +struct UvcRequestData { + length: i32, + data: [u8; UVC_DATA_SIZE], +} + +#[derive(Clone, Copy)] +struct UvcConfig { + width: u32, + height: u32, + fps: u32, + interval: u32, + max_packet: u32, +} + +struct UvcState { + cfg: UvcConfig, + default: [u8; STREAM_CTRL_SIZE], + probe: [u8; STREAM_CTRL_SIZE], + commit: [u8; STREAM_CTRL_SIZE], +} + +#[derive(Clone, Copy)] +struct PendingRequest { + interface: u8, + selector: u8, +} + +fn main() -> Result<()> { + let (dev, cfg) = parse_args()?; + eprintln!("[lesavka-uvc] starting (dev={dev})"); + + loop { + let file = open_with_retry(&dev)?; + let fd = file.as_raw_fd(); + let vidioc_subscribe = ioctl_write::(b'V', 90); + let vidioc_dqevent = ioctl_read::(b'V', 89); + let uvc_send_response = ioctl_write::(b'U', 1); + + subscribe_event(fd, vidioc_subscribe, UVC_EVENT_SETUP)?; + subscribe_event(fd, vidioc_subscribe, UVC_EVENT_DATA)?; + let _ = subscribe_event(fd, vidioc_subscribe, UVC_EVENT_CONNECT); + let _ = subscribe_event(fd, vidioc_subscribe, UVC_EVENT_DISCONNECT); + let _ = subscribe_event(fd, vidioc_subscribe, UVC_EVENT_STREAMON); + let _ = subscribe_event(fd, vidioc_subscribe, UVC_EVENT_STREAMOFF); + + let mut state = UvcState::new(cfg); + let mut pending: Option = None; + + loop { + let mut ev = unsafe { std::mem::zeroed::() }; + let rc = unsafe { libc::ioctl(fd, vidioc_dqevent, &mut ev) }; + if rc < 0 { + let err = std::io::Error::last_os_error(); + match err.raw_os_error() { + Some(libc::EAGAIN) | Some(libc::EINTR) => { + thread::sleep(Duration::from_millis(10)); + continue; + } + Some(libc::ENODEV) | Some(libc::EBADF) | Some(libc::EIO) => { + eprintln!("[lesavka-uvc] device reset ({err}); reopening"); + break; + } + _ => { + eprintln!("[lesavka-uvc] dqevent failed: {err}"); + thread::sleep(Duration::from_millis(100)); + continue; + } + } + } + + match ev.type_ { + UVC_EVENT_CONNECT => { + let speed = u32::from_le_bytes(event_bytes(&ev)[0..4].try_into().unwrap()); + eprintln!("[lesavka-uvc] UVC connect (speed={speed})"); + } + UVC_EVENT_DISCONNECT => eprintln!("[lesavka-uvc] UVC disconnect"), + UVC_EVENT_STREAMON => eprintln!("[lesavka-uvc] stream on"), + UVC_EVENT_STREAMOFF => eprintln!("[lesavka-uvc] stream off"), + UVC_EVENT_SETUP => { + let req = parse_ctrl_request(event_bytes(&ev)); + handle_setup( + fd, + uvc_send_response, + &mut state, + &mut pending, + req, + ); + } + UVC_EVENT_DATA => { + let data = parse_request_data(event_bytes(&ev)); + handle_data(fd, uvc_send_response, &mut state, &mut pending, data); + } + _ => {} + } + } + } +} + +fn parse_args() -> Result<(String, UvcConfig)> { + let mut args = env::args().skip(1); + let mut dev: Option = None; + + while let Some(arg) = args.next() { + match arg.as_str() { + "--device" | "-d" => dev = args.next(), + _ => dev = Some(arg), + } + } + + let dev = dev + .or_else(|| env::var("LESAVKA_UVC_DEV").ok()) + .context("missing --device (or LESAVKA_UVC_DEV)")?; + + Ok((dev, UvcConfig::from_env())) +} + +impl UvcConfig { + fn from_env() -> Self { + let width = env_u32("LESAVKA_UVC_WIDTH", 640); + let height = env_u32("LESAVKA_UVC_HEIGHT", 480); + let fps = env_u32("LESAVKA_UVC_FPS", 30).max(1); + let interval = env_u32("LESAVKA_UVC_INTERVAL", 0); + let mut max_packet = env_u32("LESAVKA_UVC_MAXPACKET", 3072); + if env::var("LESAVKA_UVC_BULK").is_ok() { + max_packet = max_packet.min(1024); + } + + let interval = if interval == 0 { + 10_000_000 / fps + } else { + interval + }; + + Self { + width, + height, + fps, + interval, + max_packet, + } + } +} + +impl UvcState { + fn new(cfg: UvcConfig) -> Self { + let default = build_streaming_control(&cfg); + Self { + cfg, + default, + probe: default, + commit: default, + } + } +} + +fn open_with_retry(path: &str) -> Result { + for attempt in 1..=200 { + match OpenOptions::new() + .read(true) + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(path) + { + Ok(f) => { + eprintln!("[lesavka-uvc] opened {path} (attempt {attempt})"); + return Ok(f); + } + Err(err) if err.raw_os_error() == Some(libc::ENOENT) => { + thread::sleep(Duration::from_millis(50)); + } + Err(err) => return Err(err).with_context(|| format!("open {path}")), + } + } + Err(anyhow::anyhow!("timeout opening {path}")) +} + +fn subscribe_event(fd: i32, req: libc::c_ulong, event: u32) -> Result<()> { + let mut sub = V4l2EventSubscription { + type_: event, + id: 0, + flags: 0, + reserved: [0; 5], + }; + let rc = unsafe { libc::ioctl(fd, req, &mut sub) }; + if rc < 0 { + return Err(std::io::Error::last_os_error()).with_context(|| { + format!("subscribe event {event:#x} (fd={fd})") + }); + } + Ok(()) +} + +fn handle_setup( + fd: i32, + uvc_send_response: libc::c_ulong, + state: &mut UvcState, + pending: &mut Option, + req: UsbCtrlRequest, +) { + let interface = (req.w_index & 0xff) as u8; + let selector = (req.w_value >> 8) as u8; + let is_in = (req.b_request_type & USB_DIR_IN) != 0; + + if !is_in && req.b_request == UVC_SET_CUR { + *pending = Some(PendingRequest { interface, selector }); + return; + } + + if !is_in { + let _ = send_stall(fd, uvc_send_response); + return; + } + + let payload = build_in_response(state, interface, selector, req.b_request); + match payload { + Some(bytes) => { + let _ = send_response(fd, uvc_send_response, &bytes); + } + None => { + let _ = send_stall(fd, uvc_send_response); + } + } +} + +fn handle_data( + fd: i32, + uvc_send_response: libc::c_ulong, + state: &mut UvcState, + pending: &mut Option, + data: UvcRequestData, +) { + let Some(p) = pending.take() else { + let _ = send_response(fd, uvc_send_response, &[]); + return; + }; + + if data.length < 0 { + let _ = send_stall(fd, uvc_send_response); + return; + } + + let len = data.length as usize; + let slice = &data.data[..len.min(data.data.len())]; + + if p.interface == UVC_STRING_STREAMING_IDX + && matches!(p.selector, UVC_VS_PROBE_CONTROL | UVC_VS_COMMIT_CONTROL) + { + let sanitized = sanitize_streaming_control(slice, state); + if p.selector == UVC_VS_PROBE_CONTROL { + state.probe = sanitized; + } else { + state.commit = sanitized; + } + } + + let _ = send_response(fd, uvc_send_response, &[]); +} + +fn build_in_response( + state: &UvcState, + interface: u8, + selector: u8, + request: u8, +) -> Option> { + match interface { + UVC_STRING_STREAMING_IDX => build_streaming_response(state, selector, request), + UVC_STRING_CONTROL_IDX => build_control_response(selector, request), + _ => None, + } +} + +fn build_streaming_response( + state: &UvcState, + selector: u8, + request: u8, +) -> Option> { + let current = match selector { + UVC_VS_PROBE_CONTROL => state.probe, + UVC_VS_COMMIT_CONTROL => state.commit, + _ => return None, + }; + + match request { + UVC_GET_INFO => Some(vec![0x03]), + UVC_GET_LEN => Some((STREAM_CTRL_SIZE as u16).to_le_bytes().to_vec()), + UVC_GET_CUR => Some(current.to_vec()), + UVC_GET_MIN | UVC_GET_MAX | UVC_GET_DEF | UVC_GET_RES => Some(state.default.to_vec()), + _ => None, + } +} + +fn build_control_response(selector: u8, request: u8) -> Option> { + match request { + UVC_GET_INFO => Some(vec![0x01]), + UVC_GET_LEN => Some(1u16.to_le_bytes().to_vec()), + UVC_GET_CUR | UVC_GET_MIN | UVC_GET_MAX | UVC_GET_DEF | UVC_GET_RES => { + if selector == UVC_VC_REQUEST_ERROR_CODE_CONTROL { + Some(vec![0x00]) + } else { + Some(vec![0x00]) + } + } + _ => None, + } +} + +fn sanitize_streaming_control(data: &[u8], state: &UvcState) -> [u8; STREAM_CTRL_SIZE] { + let mut out = state.default; + if data.len() >= STREAM_CTRL_SIZE { + let format_index = data[2]; + let frame_index = data[3]; + let interval = read_le32(data, 4); + + if format_index == 1 { + out[2] = 1; + } + if frame_index == 1 { + out[3] = 1; + } + if interval == state.cfg.interval { + write_le32(&mut out[4..8], interval); + } + } + out +} + +fn send_response(fd: i32, req: libc::c_ulong, payload: &[u8]) -> Result<()> { + let mut resp = UvcRequestData { + length: payload.len() as i32, + data: [0u8; UVC_DATA_SIZE], + }; + let n = payload.len().min(UVC_DATA_SIZE); + resp.data[..n].copy_from_slice(&payload[..n]); + + let rc = unsafe { libc::ioctl(fd, req, &resp) }; + if rc < 0 { + return Err(std::io::Error::last_os_error()).context("UVCIOC_SEND_RESPONSE"); + } + Ok(()) +} + +fn send_stall(fd: i32, req: libc::c_ulong) -> Result<()> { + let resp = UvcRequestData { + length: -1, + data: [0u8; UVC_DATA_SIZE], + }; + let rc = unsafe { libc::ioctl(fd, req, &resp) }; + if rc < 0 { + return Err(std::io::Error::last_os_error()).context("UVCIOC_SEND_RESPONSE(stall)"); + } + Ok(()) +} + +fn build_streaming_control(cfg: &UvcConfig) -> [u8; STREAM_CTRL_SIZE] { + let mut buf = [0u8; STREAM_CTRL_SIZE]; + let frame_size = cfg.width * cfg.height * 2; + + write_le16(&mut buf[0..2], 1); // bmHint: dwFrameInterval + buf[2] = 1; // bFormatIndex + buf[3] = 1; // bFrameIndex + write_le32(&mut buf[4..8], cfg.interval); + write_le16(&mut buf[8..10], 0); + write_le16(&mut buf[10..12], 0); + write_le16(&mut buf[12..14], 0); + write_le16(&mut buf[14..16], 0); + write_le16(&mut buf[16..18], 0); + write_le32(&mut buf[18..22], frame_size); + write_le32(&mut buf[22..26], cfg.max_packet); + write_le32(&mut buf[26..30], 48_000_000); + buf[30] = 0; + buf[31] = 0; + buf[32] = 0; + buf[33] = 0; + + buf +} + +fn event_bytes(ev: &V4l2Event) -> [u8; 64] { + unsafe { ev.u.data } +} + +fn parse_ctrl_request(data: [u8; 64]) -> UsbCtrlRequest { + UsbCtrlRequest { + b_request_type: data[0], + b_request: data[1], + w_value: u16::from_le_bytes([data[2], data[3]]), + w_index: u16::from_le_bytes([data[4], data[5]]), + w_length: u16::from_le_bytes([data[6], data[7]]), + } +} + +fn parse_request_data(data: [u8; 64]) -> UvcRequestData { + let length = i32::from_le_bytes([data[0], data[1], data[2], data[3]]); + let mut out = [0u8; UVC_DATA_SIZE]; + out.copy_from_slice(&data[4..64]); + UvcRequestData { length, data: out } +} + +fn env_u32(name: &str, default: u32) -> u32 { + env::var(name) + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(default) +} + +fn write_le16(dst: &mut [u8], val: u16) { + let bytes = val.to_le_bytes(); + dst[0] = bytes[0]; + dst[1] = bytes[1]; +} + +fn write_le32(dst: &mut [u8], val: u32) { + let bytes = val.to_le_bytes(); + dst[0] = bytes[0]; + dst[1] = bytes[1]; + dst[2] = bytes[2]; + dst[3] = bytes[3]; +} + +fn read_le32(src: &[u8], offset: usize) -> u32 { + u32::from_le_bytes([src[offset], src[offset + 1], src[offset + 2], src[offset + 3]]) +} + +const IOC_NRBITS: u8 = 8; +const IOC_TYPEBITS: u8 = 8; +const IOC_SIZEBITS: u8 = 14; +const IOC_DIRBITS: u8 = 2; + +const IOC_NRSHIFT: u8 = 0; +const IOC_TYPESHIFT: u8 = IOC_NRSHIFT + IOC_NRBITS; +const IOC_SIZESHIFT: u8 = IOC_TYPESHIFT + IOC_TYPEBITS; +const IOC_DIRSHIFT: u8 = IOC_SIZESHIFT + IOC_SIZEBITS; + +const IOC_READ: u8 = 2; +const IOC_WRITE: u8 = 1; + +fn ioctl_read(type_: u8, nr: u8) -> libc::c_ulong { + ioc(IOC_READ, type_, nr, std::mem::size_of::() as u16) +} + +fn ioctl_write(type_: u8, nr: u8) -> libc::c_ulong { + ioc(IOC_WRITE, type_, nr, std::mem::size_of::() as u16) +} + +fn ioc(dir: u8, type_: u8, nr: u8, size: u16) -> libc::c_ulong { + let dir = (dir as u32) << IOC_DIRSHIFT; + let ty = (type_ as u32) << IOC_TYPESHIFT; + let nr = (nr as u32) << IOC_NRSHIFT; + let size = (size as u32) << IOC_SIZESHIFT; + (dir | ty | nr | size) as libc::c_ulong +} diff --git a/server/src/main.rs b/server/src/main.rs index 9a774a0..8d384bb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,9 +5,9 @@ use anyhow::Context as _; use futures_util::{Stream, StreamExt}; use gstreamer as gst; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc}; +use std::{backtrace::Backtrace, panic, pin::Pin, process::Command, sync::Arc}; use tokio::{fs::OpenOptions, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; @@ -89,6 +89,50 @@ fn next_minute() -> SystemTime { UNIX_EPOCH + Duration::from_secs(next) } +async fn recover_hid_if_needed( + err: &std::io::Error, + gadget: UsbGadget, + kb: Arc>, + ms: Arc>, + did_cycle: Arc, +) { + let code = err.raw_os_error(); + let should_recover = matches!(code, Some(libc::ENOTCONN) | Some(libc::ESHUTDOWN) | Some(libc::EPIPE)); + if !should_recover { + return; + } + if did_cycle + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return; + } + + tokio::spawn(async move { + warn!("🔁 HID transport down (errno={code:?}) - cycling gadget"); + match tokio::task::spawn_blocking(move || gadget.cycle()).await { + Ok(Ok(())) => info!("✅ USB gadget cycle complete (auto-recover)"), + Ok(Err(e)) => error!("💥 USB gadget cycle failed: {e:#}"), + Err(e) => error!("💥 USB gadget cycle task panicked: {e:#}"), + } + + if let Err(e) = async { + let kb_new = open_with_retry("/dev/hidg0").await?; + let ms_new = open_with_retry("/dev/hidg1").await?; + *kb.lock().await = kb_new; + *ms.lock().await = ms_new; + Ok::<(), anyhow::Error>(()) + } + .await + { + error!("💥 HID reopen failed: {e:#}"); + } + + tokio::time::sleep(Duration::from_secs(2)).await; + did_cycle.store(false, Ordering::SeqCst); + }); +} + /// Pick the UVC gadget video node. /// Priority: 1) `LESAVKA_UVC_DEV` override; 2) first `video_output` node. /// Returns an error when nothing matches instead of guessing a capture card. @@ -120,12 +164,22 @@ fn pick_uvc_device() -> anyhow::Result { )) } +fn spawn_uvc_control(uvc_dev: &str) -> anyhow::Result { + let bin = std::env::var("LESAVKA_UVC_CTRL_BIN") + .unwrap_or_else(|_| "/usr/local/bin/lesavka-uvc".to_string()); + Command::new(bin) + .arg("--device") + .arg(uvc_dev) + .spawn() + .context("spawning lesavka-uvc") +} + /*──────────────── Handler ───────────────────*/ struct Handler { kb: Arc>, ms: Arc>, gadget: UsbGadget, - did_cycle: AtomicBool, + did_cycle: Arc, } impl Handler { @@ -146,7 +200,7 @@ impl Handler { kb: Arc::new(Mutex::new(kb)), ms: Arc::new(Mutex::new(ms)), gadget, - did_cycle: AtomicBool::new(false), + did_cycle: Arc::new(AtomicBool::new(false)), }) } @@ -176,12 +230,17 @@ impl Relay for Handler { ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); + let ms = self.ms.clone(); + let gadget = self.gadget.clone(); + let did_cycle = self.did_cycle.clone(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = kb.lock().await.write_all(&pkt.data).await { warn!("⌨️ write failed: {e} (dropped)"); + recover_hid_if_needed(&e, gadget.clone(), kb.clone(), ms.clone(), did_cycle.clone()) + .await; } tx.send(Ok(pkt)).await.ok(); } @@ -197,12 +256,17 @@ impl Relay for Handler { ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(1024); let ms = self.ms.clone(); + let kb = self.kb.clone(); + let gadget = self.gadget.clone(); + let did_cycle = self.did_cycle.clone(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = ms.lock().await.write_all(&pkt.data).await { warn!("🖱️ write failed: {e} (dropped)"); + recover_hid_if_needed(&e, gadget.clone(), kb.clone(), ms.clone(), did_cycle.clone()) + .await; } tx.send(Ok(pkt)).await.ok(); } @@ -337,6 +401,27 @@ async fn main() -> anyhow::Result<()> { })); let gadget = UsbGadget::new("lesavka"); + let _uvc_ctrl = if std::env::var("LESAVKA_DISABLE_UVC").is_err() { + match pick_uvc_device() { + Ok(uvc_dev) => match spawn_uvc_control(&uvc_dev) { + Ok(child) => { + info!(%uvc_dev, "📷 UVC control helper started"); + Some(child) + } + Err(e) => { + warn!("⚠️ failed to start lesavka-uvc: {e:#}"); + None + } + }, + Err(e) => { + warn!("⚠️ UVC device not ready: {e:#}"); + None + } + } + } else { + info!("📷 UVC disabled (LESAVKA_DISABLE_UVC set)"); + None + }; let handler = Handler::new(gadget.clone()).await?; info!("🌐 lesavka-server listening on 0.0.0.0:50051");