lesavka/server/src/main.rs

438 lines
16 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! lesavka-server - **auto-cycle disabled**
// server/src/main.rs
#![forbid(unsafe_code)]
use anyhow::Context as _;
use futures_util::{Stream, StreamExt};
use gstreamer as gst;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{backtrace::Backtrace, panic, pin::Pin, process::Command, sync::Arc};
use tokio::{fs::OpenOptions, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use tonic_reflection::server::Builder as ReflBuilder;
use tracing::{debug, error, info, trace, warn};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use lesavka_common::lesavka::{
AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, ResetUsbReply, VideoPacket,
relay_server::{Relay, RelayServer},
};
use lesavka_server::{audio, gadget::UsbGadget, handshake::HandshakeSvc, video};
/*──────────────── constants ────────────────*/
/// **false** = never reset automatically.
const AUTO_CYCLE: bool = false;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const PKG_NAME: &str = env!("CARGO_PKG_NAME");
/*──────────────── logging ───────────────────*/
fn init_tracing() -> anyhow::Result<WorkerGuard> {
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = tracing_appender::non_blocking(file);
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn"));
let filter_str = env_filter.to_string();
tracing_subscriber::registry()
.with(env_filter)
.with(fmt::layer().with_target(true).with_thread_ids(true))
.with(
fmt::layer()
.with_writer(file_writer)
.with_ansi(false)
.with_target(true)
.with_level(true),
)
.init();
tracing::info!("📜 effective RUST_LOG = \"{}\"", filter_str);
Ok(guard)
}
/*──────────────── helpers ───────────────────*/
async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
for attempt in 1..=200 {
// ≈10s
match OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.await
{
Ok(f) => {
info!("✅ {path} opened on attempt #{attempt}");
return Ok(f);
}
Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
trace!("⏳ {path} busy… retry #{attempt}");
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => return Err(e).with_context(|| format!("opening {path}")),
}
}
Err(anyhow::anyhow!("timeout waiting for {path}"))
}
fn next_minute() -> SystemTime {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let secs = now.as_secs();
let next = (secs / 60 + 1) * 60;
UNIX_EPOCH + Duration::from_secs(next)
}
async fn recover_hid_if_needed(
err: &std::io::Error,
gadget: UsbGadget,
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
did_cycle: Arc<AtomicBool>,
) {
let code = err.raw_os_error();
let should_recover = matches!(code, Some(libc::ENOTCONN) | Some(libc::ESHUTDOWN) | Some(libc::EPIPE));
if !should_recover {
return;
}
if did_cycle
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return;
}
tokio::spawn(async move {
warn!("🔁 HID transport down (errno={code:?}) - cycling gadget");
match tokio::task::spawn_blocking(move || gadget.cycle()).await {
Ok(Ok(())) => info!("✅ USB gadget cycle complete (auto-recover)"),
Ok(Err(e)) => error!("💥 USB gadget cycle failed: {e:#}"),
Err(e) => error!("💥 USB gadget cycle task panicked: {e:#}"),
}
if let Err(e) = async {
let kb_new = open_with_retry("/dev/hidg0").await?;
let ms_new = open_with_retry("/dev/hidg1").await?;
*kb.lock().await = kb_new;
*ms.lock().await = ms_new;
Ok::<(), anyhow::Error>(())
}
.await
{
error!("💥 HID reopen failed: {e:#}");
}
tokio::time::sleep(Duration::from_secs(2)).await;
did_cycle.store(false, Ordering::SeqCst);
});
}
/// Pick the UVC gadget video node.
/// Priority: 1) `LESAVKA_UVC_DEV` override; 2) first `video_output` node.
/// Returns an error when nothing matches instead of guessing a capture card.
fn pick_uvc_device() -> anyhow::Result<String> {
if let Ok(path) = std::env::var("LESAVKA_UVC_DEV") {
return Ok(path);
}
// walk /dev/video* via udev and look for an outputcapable node (gadget exposes one)
if let Ok(mut en) = udev::Enumerator::new() {
let _ = en.match_subsystem("video4linux");
if let Ok(devs) = en.scan_devices() {
for dev in devs {
let caps = dev
.property_value("ID_V4L_CAPABILITIES")
.and_then(|v| v.to_str())
.unwrap_or_default();
if caps.contains(":video_output:") {
if let Some(node) = dev.devnode() {
return Ok(node.to_string_lossy().into_owned());
}
}
}
}
}
Err(anyhow::anyhow!(
"no video_output v4l2 node found; set LESAVKA_UVC_DEV"
))
}
fn spawn_uvc_control(uvc_dev: &str) -> anyhow::Result<std::process::Child> {
let bin = std::env::var("LESAVKA_UVC_CTRL_BIN")
.unwrap_or_else(|_| "/usr/local/bin/lesavka-uvc".to_string());
Command::new(bin)
.arg("--device")
.arg(uvc_dev)
.spawn()
.context("spawning lesavka-uvc")
}
/*──────────────── Handler ───────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
gadget: UsbGadget,
did_cycle: Arc<AtomicBool>,
}
impl Handler {
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
if AUTO_CYCLE {
info!("🛠️ Initial USB reset…");
let _ = gadget.cycle(); // ignore failure - may boot without host
} else {
info!("🛠️ AUTO_CYCLE disabled - no initial reset");
}
info!("🛠️ opening HID endpoints …");
let kb = open_with_retry("/dev/hidg0").await?;
let ms = open_with_retry("/dev/hidg1").await?;
info!("✅ HID endpoints ready");
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: Arc::new(AtomicBool::new(false)),
})
}
async fn reopen_hid(&self) -> anyhow::Result<()> {
let kb_new = open_with_retry("/dev/hidg0").await?;
let ms_new = open_with_retry("/dev/hidg1").await?;
*self.kb.lock().await = kb_new;
*self.ms.lock().await = ms_new;
Ok(())
}
}
/*──────────────── gRPC service ─────────────*/
#[tonic::async_trait]
impl Relay for Handler {
/* existing streams ─ unchanged, except: no more auto-reset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send>>;
type CaptureAudioStream = Pin<Box<dyn Stream<Item = Result<AudioPacket, Status>> + Send>>;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
let ms = self.ms.clone();
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = kb.lock().await.write_all(&pkt.data).await {
warn!("⌨️ write failed: {e} (dropped)");
recover_hid_if_needed(&e, gadget.clone(), kb.clone(), ms.clone(), did_cycle.clone())
.await;
}
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let ms = self.ms.clone();
let kb = self.kb.clone();
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = ms.lock().await.write_all(&pkt.data).await {
warn!("🖱️ write failed: {e} (dropped)");
recover_hid_if_needed(&e, gadget.clone(), kb.clone(), ms.clone(), did_cycle.clone())
.await;
}
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_microphone(
&self,
req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
// 1 ─ build once, early
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
info!(%uac_dev, "🎤 stream_microphone using UAC sink");
let mut sink = audio::Voice::new(&uac_dev)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
// 2 ─ dummy outbound stream (same trick as before)
let (tx, rx) = tokio::sync::mpsc::channel(1);
// 3 ─ drive the sink in a background task
tokio::spawn(async move {
let mut inbound = req.into_inner();
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
while let Some(pkt) = inbound.next().await.transpose()? {
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
tracing::trace!("🎤⬇ srv pkt#{n} {} bytes", pkt.data.len());
}
sink.push(&pkt);
}
sink.finish(); // flush on EOS
let _ = tx.send(Ok(Empty {})).await;
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_camera(
&self,
req: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
// map gRPC camera id → UVC device
let uvc = pick_uvc_device().map_err(|e| Status::internal(format!("{e:#}")))?;
info!(%uvc, "🎥 stream_camera using UVC sink");
// build once
let relay =
video::CameraRelay::new(0, &uvc).map_err(|e| Status::internal(format!("{e:#}")))?;
// dummy outbound (same pattern as other streams)
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
relay.feed(pkt); // ← all logging inside video.rs
}
tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let id = req.into_inner().id;
let dev = match id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
};
debug!("🎥 streaming {dev}");
let s = video::eye_ball(dev, id, 6_000)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))
}
async fn capture_audio(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
// Only one speaker stream for now; both 0/1 → same ALSA dev.
let _id = req.into_inner().id;
// Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging).
let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let s = audio::ear(&dev, 0)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))
}
/*────────────── USB-reset RPC ────────────*/
async fn reset_usb(&self, _req: Request<Empty>) -> Result<Response<ResetUsbReply>, Status> {
info!("🔴 explicit ResetUsb() called");
match self.gadget.cycle() {
Ok(_) => {
if let Err(e) = self.reopen_hid().await {
error!("💥 reopen HID failed: {e:#}");
return Err(Status::internal(e.to_string()));
}
Ok(Response::new(ResetUsbReply { ok: true }))
}
Err(e) => {
error!("💥 cycle failed: {e:#}");
Err(Status::internal(e.to_string()))
}
}
}
}
/*──────────────── main ───────────────────────*/
#[tokio::main(worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
let _guard = init_tracing()?;
info!("🚀 {} v{} starting up", PKG_NAME, VERSION);
panic::set_hook(Box::new(|p| {
let bt = Backtrace::force_capture();
error!("💥 panic: {p}\n{bt}");
}));
let gadget = UsbGadget::new("lesavka");
let _uvc_ctrl = if std::env::var("LESAVKA_DISABLE_UVC").is_err() {
match pick_uvc_device() {
Ok(uvc_dev) => match spawn_uvc_control(&uvc_dev) {
Ok(child) => {
info!(%uvc_dev, "📷 UVC control helper started");
Some(child)
}
Err(e) => {
warn!("⚠️ failed to start lesavka-uvc: {e:#}");
None
}
},
Err(e) => {
warn!("⚠️ UVC device not ready: {e:#}");
None
}
}
} else {
info!("📷 UVC disabled (LESAVKA_DISABLE_UVC set)");
None
};
let handler = Handler::new(gadget.clone()).await?;
info!("🌐 lesavka-server listening on 0.0.0.0:50051");
Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(2 * 1024 * 1024))
.add_service(RelayServer::new(handler))
.add_service(HandshakeSvc::server())
.add_service(ReflBuilder::configure().build_v1().unwrap())
.serve(([0, 0, 0, 0], 50051).into())
.await?;
Ok(())
}