lesavka/server/src/main.rs
2025-06-26 20:38:55 -05:00

323 lines
12 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! lesavka-server
// sever/src/main.rs
#![forbid(unsafe_code)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc, time::Duration};
use futures_util::{Stream, StreamExt};
use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status};
use anyhow::Context as _;
use tracing::{info, trace, warn, error};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking;
use tracing_appender::non_blocking::WorkerGuard;
use lesavka_server::{usb_gadget::UsbGadget, video};
use lesavka_common::lesavka::{
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket,
};
// ───────────────── helper ─────────────────────
fn init_tracing() -> anyhow::Result<WorkerGuard> {
// 1. create file writer + guard
let file = std::fs::OpenOptions::new()
.create(true).write(true).append(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = non_blocking(file);
// 2. build subscriber once
let env = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| {EnvFilter::new("lesavka_server=info")});
let console_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true);
let file_layer = fmt::layer()
.with_writer(file_writer)
.with_ansi(false);
tracing_subscriber::registry()
.with(env)
.with(console_layer)
.with(file_layer)
.init();
Ok(guard)
}
async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
const MAX_ATTEMPTS: usize = 200; // ≈ 10s (@50ms)
for attempt in 1..=MAX_ATTEMPTS {
match OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.await
{
Ok(f) => {
info!("✅ {path} opened on attempt #{attempt}");
return Ok(f);
}
Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
trace!("⏳ {path} busy, retry #{attempt}");
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => return Err(e).with_context(|| format!("💥 opening {path}")),
}
}
Err(anyhow::anyhow!("💥 timeout waiting for {path} to become available"))
}
fn owners_of(path: &str) -> String {
use std::{fs, os::unix::fs::MetadataExt};
let Ok(target_ino) = fs::metadata(path).map(|m| m.ino()) else { return "-".into() };
let mut pids = Vec::new();
if let Ok(entries) = fs::read_dir("/proc") {
for e in entries.flatten() {
let file_name = e.file_name();
let pid = file_name.to_string_lossy().into_owned();
if !pid.chars().all(|c| c.is_ascii_digit()) { continue }
let fd_dir = e.path().join("fd");
for fd in fs::read_dir(fd_dir).into_iter().flatten().flatten() {
if let Ok(meta) = fd.metadata() {
if meta.ino() == target_ino {
pids.push(pid.to_string());
break;
}
}
}
}
}
if pids.is_empty() { "-".into() } else { pids.join(",") }
}
async fn wait_configured(ctrl: &str, limit_ms: u64) -> anyhow::Result<()> {
for _ in 0..=limit_ms/50 {
let s = UsbGadget::state(ctrl)?;
if s.trim() == "configured" { return Ok(()) }
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(anyhow::anyhow!("host never configured"))
}
/*─────────────────── tonic service ─────────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
gadget: UsbGadget,
did_cycle: AtomicBool,
}
impl Handler {
async fn make(gadget: UsbGadget) -> anyhow::Result<Self> {
info!("🛠️ Handler::make - cycling gadget ...");
gadget.cycle()?;
let ctrl = UsbGadget::find_controller()?;
wait_configured(&ctrl, 10_000).await
.context("waiting for host to configure")?;
let state = UsbGadget::wait_state_any(&ctrl, 5_000)?;
match state.as_str() {
"configured" => info!("✅ host enumerated (configured)"),
"not attached" => warn!("⚠️ host absent HID writes will be queued"),
_ => warn!("⚠️ unexpected UDC state: {state}"),
}
tokio::time::sleep(Duration::from_millis(1000)).await;
info!("🛠️ opening HID endpoints ...");
let kb = open_with_retry("/dev/hidg0").await?;
let ms = open_with_retry("/dev/hidg1").await?;
info!("✅ HID endpoints ready");
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: AtomicBool::new(true),
})
}
}
#[tonic::async_trait]
impl Relay for Handler {
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send + Sync + 'static>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
// self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
if !self.did_cycle.swap(true, Ordering::SeqCst) {
self.gadget
.cycle()
.map_err(|e| Status::internal(e.to_string()))?;
tokio::time::sleep(Duration::from_millis(500)).await;
}
let (tx, rx) =
tokio::sync::mpsc::channel::<Result<KeyboardReport, Status>>(32);
let kb = self.kb.clone();
let gadget = self.gadget.clone();
let ctrl = UsbGadget::find_controller().unwrap_or_default();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
/* try to write once */
let mut guard = kb.lock().await;
if let Err(e) = guard.write_all(&pkt.data).await {
/* host vanished ? */
if matches!(e.raw_os_error(),
Some(libc::ESHUTDOWN)|Some(libc::ENODEV)|Some(libc::EPIPE)) {
warn!("host disappeared recycling gadget");
gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
wait_configured(&ctrl, 10_000).await
.map_err(|e| Status::internal(e.to_string()))?;
/* reopen endpoint & swap into mutex */
*guard = open_with_retry("/dev/hidg0").await
.map_err(|e| Status::internal(e.to_string()))?;
} else {
return Err(Status::internal(e.to_string()));
}
}
drop(guard); /* release lock before await */
tx.send(Ok(pkt)).await.ok(); /* besteffort echo */
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) =
tokio::sync::mpsc::channel::<Result<MouseReport, Status>>(4096);
let ms = self.ms.clone();
let gadget = self.gadget.clone();
let ctrl = UsbGadget::find_controller().unwrap_or_default();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
/* try to write once */
let mut guard = ms.lock().await;
if let Err(e) = guard.write_all(&pkt.data).await {
/* host vanished ? */
if matches!(e.raw_os_error(),
Some(libc::ESHUTDOWN)|Some(libc::ENODEV)|Some(libc::EPIPE)) {
warn!("host disappeared recycling gadget");
gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
wait_configured(&ctrl, 10_000).await
.map_err(|e| Status::internal(e.to_string()))?;
/* reopen endpoint & swap into mutex */
*guard = open_with_retry("/dev/hidg1").await
.map_err(|e| Status::internal(e.to_string()))?;
} else {
return Err(Status::internal(e.to_string()));
}
}
drop(guard); /* release lock before await */
tx.send(Ok(pkt)).await.ok(); /* besteffort echo */
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let r = req.into_inner();
// let devs = loop {
// let list = list_gc311_devices()
// .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?;
// if !list.is_empty() { break list; }
// tokio::time::sleep(Duration::from_secs(1)).await;
// };
// let dev = devs
// .get(r.id as usize)
// .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))?
// .to_owned();
let dev = match r.id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
}
.to_string();
info!("🎥 streaming {dev} at ≤{} kb/s", r.max_bitrate);
let s = video::spawn_camera(&dev, r.id, r.max_bitrate)
.await
.map_err(|e| Status::internal(format!("{e:#?}")))?;
Ok(Response::new(Box::pin(s) as _))
}
}
/*─────────────────── main ──────────────────────────────*/
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
/* logging */
let _log_guard: WorkerGuard = init_tracing()?;
panic::set_hook(Box::new(|p| {
let bt = Backtrace::force_capture();
error!("💥 panic: {p}\n{bt}");
}));
/* autocycle task */
// tokio::spawn(async { monitor_gc311_disconnect().await.ok(); });
let gadget = UsbGadget::new("lesavka");
let handler = match Handler::make(gadget.clone()).await {
Ok(h) => h,
Err(e) => {
error!("💥 failed to create Handler: {e:#}");
std::process::exit(1);
}
};
// tokio::spawn({
// let gadget = gadget.clone();
// async move {
// loop {
// tokio::time::sleep(Duration::from_secs(4)).await;
// if LAST_HID_WRITE.elapsed().as_secs() > 3 {
// warn!("no HID traffic in 3 s - cycling UDC");
// let _ = gadget.cycle();
// }
// }
// }
// });
info!("🌐 lesavkaserver listening on 0.0.0.0:50051");
if let Err(e) = Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(256 * 1024))
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
.await
{
error!("💥 gRPC server exited: {e:#}");
std::process::exit(1);
}
Ok(())
}