lesavka/server/src/main.rs
2025-06-25 20:36:42 -05:00

308 lines
11 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! lesavka-server
// sever/src/main.rs
#![forbid(unsafe_code)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc, time::Duration};
use futures_util::{Stream, StreamExt};
use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status};
use anyhow::Context as _;
use tracing::{info, trace, warn, error};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking;
use tracing_appender::non_blocking::WorkerGuard;
use udev::{MonitorBuilder};
use lesavka_server::{usb_gadget::UsbGadget, video};
use lesavka_common::lesavka::{
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket,
};
// ───────────────── helper ─────────────────────
fn init_tracing() -> anyhow::Result<WorkerGuard> {
// 1. create file writer + guard
let file = std::fs::OpenOptions::new()
.create(true).write(true).append(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = non_blocking(file);
// 2. build subscriber once
let env = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info"));
let console_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true);
let file_layer = fmt::layer()
.with_writer(file_writer)
.with_ansi(false);
tracing_subscriber::registry()
.with(env)
.with(console_layer)
.with(file_layer)
.init();
Ok(guard)
}
async fn open_with_retry(
path: &str,
retries: usize,
base_delay_ms: u64,
) -> anyhow::Result<tokio::fs::File> {
let start = std::time::Instant::now();
for attempt in 0..=retries {
match OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.await
{
Ok(f) => {
info!("✅ opened {path} (attempt {attempt}, {} ms)",
start.elapsed().as_millis());
return Ok(f);
}
Err(e) if attempt < retries => {
let delay = base_delay_ms * 2u64.saturating_pow(attempt as u32);
warn!(
"🕒 {path} busy: {e:?}; holders=[{}]; retry {attempt}/{retries} in {delay} ms",
owners_of(path)
);
tokio::time::sleep(Duration::from_millis(delay)).await;
}
Err(e) => {
error!(
"💥 all retries exhausted for {path} ({} ms): {e:?}; holders=[{}]",
start.elapsed().as_millis(),
owners_of(path)
);
return Err(e).with_context(|| format!("opening {path}"));
}
}
}
unreachable!()
}
fn owners_of(path: &str) -> String {
use std::{fs, os::unix::fs::MetadataExt};
let Ok(target_ino) = fs::metadata(path).map(|m| m.ino()) else { return "-".into() };
let mut pids = Vec::new();
if let Ok(entries) = fs::read_dir("/proc") {
for e in entries.flatten() {
let file_name = e.file_name();
let pid = file_name.to_string_lossy().into_owned();
if !pid.chars().all(|c| c.is_ascii_digit()) { continue }
let fd_dir = e.path().join("fd");
for fd in fs::read_dir(fd_dir).into_iter().flatten().flatten() {
if let Ok(meta) = fd.metadata() {
if meta.ino() == target_ino {
pids.push(pid.to_string());
break;
}
}
}
}
}
if pids.is_empty() { "-".into() } else { pids.join(",") }
}
/*─────────────────── tonic service ─────────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
gadget: UsbGadget,
did_cycle: AtomicBool,
}
impl Handler {
async fn make(gadget: UsbGadget) -> anyhow::Result<Self> {
info!("🛠️ Handler::make - cycling gadget ...");
gadget.cycle()?;
tokio::time::sleep(Duration::from_millis(1000)).await;
info!("🛠️ opening HID endpoints ...");
let kb = open_with_retry("/dev/hidg0", 20, 50).await?;
let ms = open_with_retry("/dev/hidg1", 20, 50).await?;
info!("✅ HID endpoints ready");
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: AtomicBool::new(true),
})
}
}
#[tonic::async_trait]
impl Relay for Handler {
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send + Sync + 'static>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
// self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
if !self.did_cycle.swap(true, Ordering::SeqCst) {
self.gadget
.cycle()
.map_err(|e| Status::internal(e.to_string()))?;
tokio::time::sleep(Duration::from_millis(500)).await;
}
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
// kb.lock().await.write_all(&pkt.data).await?;
loop {
match kb.lock().await.write_all(&pkt.data).await {
Ok(()) => {
trace!("⌨️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
},
Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
}
tx.send(Ok(pkt)).await;//.ok(); // best-effort echo
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(4096); // higher burst
let ms = self.ms.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
let mut boot_mode = true;
while let Some(pkt) = s.next().await.transpose()? {
loop {
match ms.lock().await.write_all(&pkt.data).await {
Ok(()) => {
trace!("🖱️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
}
Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
}
let _ = tx.send(Ok(pkt)).await;
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let r = req.into_inner();
// let devs = loop {
// let list = list_gc311_devices()
// .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?;
// if !list.is_empty() { break list; }
// tokio::time::sleep(Duration::from_secs(1)).await;
// };
// let dev = devs
// .get(r.id as usize)
// .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))?
// .to_owned();
let dev = match r.id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
}
.to_string();
info!("🎥 streaming {dev} at ≤{} kb/s", r.max_bitrate);
let s = video::spawn_camera(&dev, r.id, r.max_bitrate)
.await
.map_err(|e| Status::internal(format!("{e:#?}")))?;
Ok(Response::new(Box::pin(s) as _))
}
}
/*─────────────────── main ──────────────────────────────*/
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
/* logging */
let _log_guard: WorkerGuard = init_tracing()?;
panic::set_hook(Box::new(|p| {
let bt = Backtrace::force_capture();
error!("💥 panic: {p}\n{bt}");
}));
/* autocycle task */
// tokio::spawn(async { monitor_gc311_disconnect().await.ok(); });
let gadget = UsbGadget::new("lesavka");
let handler = match Handler::make(gadget.clone()).await {
Ok(h) => h,
Err(e) => {
error!("💥 failed to create Handler: {e:#}");
std::process::exit(1);
}
};
// tokio::spawn({
// let gadget = gadget.clone();
// async move {
// loop {
// tokio::time::sleep(Duration::from_secs(4)).await;
// if LAST_HID_WRITE.elapsed().as_secs() > 3 {
// warn!("no HID traffic in 3 s - cycling UDC");
// let _ = gadget.cycle();
// }
// }
// }
// });
info!("🌐 lesavkaserver listening on 0.0.0.0:50051");
if let Err(e) = Server::builder()
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
.await
{
error!("💥 gRPC server exited: {e:#}");
std::process::exit(1);
}
Ok(())
}