lesavka/server/src/main.rs
2025-06-25 18:23:38 -05:00

229 lines
8.2 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! lesavka-server — receive HidReport and write to /dev/hidg0
// sever/src/main.rs
#![forbid(unsafe_code)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::{pin::Pin, sync::Arc, time::Duration};
use futures_util::{Stream, StreamExt};
use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status};
use anyhow::Context as _;
use tracing::{info, trace, warn};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking;
use tracing_appender::non_blocking::WorkerGuard;
use udev::{MonitorBuilder};
use lesavka_server::{usb_gadget::UsbGadget, video};
use lesavka_common::lesavka::{
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket,
};
// ───────────────── helper ─────────────────────
fn init_tracing() -> anyhow::Result<WorkerGuard> {
// 1. create file writer + guard
let file = std::fs::OpenOptions::new()
.create(true).write(true).truncate(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = non_blocking(file);
// 2. build subscriber once
let env = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info"));
let console_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true);
let file_layer = fmt::layer()
.with_writer(file_writer)
.with_ansi(false);
tracing_subscriber::registry()
.with(env)
.with(console_layer)
.with(file_layer)
.init();
Ok(guard)
}
/*─────────────────── tonic service ─────────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
gadget: UsbGadget,
did_cycle: AtomicBool,
}
impl Handler {
async fn make(gadget: UsbGadget) -> anyhow::Result<Self> {
gadget.cycle()?;
tokio::time::sleep(Duration::from_secs(1)).await;
let kb = OpenOptions::new()
.write(true).custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg0").await
.context("opening /dev/hidg0")?;
let ms = OpenOptions::new()
.write(true).custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg1").await
.context("opening /dev/hidg1")?;
Ok(Self { kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: AtomicBool::new(true),
})
}
}
#[tonic::async_trait]
impl Relay for Handler {
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send + Sync + 'static>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
// self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?;
if !self.did_cycle.swap(true, Ordering::SeqCst) {
self.gadget
.cycle()
.map_err(|e| Status::internal(e.to_string()))?;
tokio::time::sleep(Duration::from_millis(500)).await;
}
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
// kb.lock().await.write_all(&pkt.data).await?;
loop {
match kb.lock().await.write_all(&pkt.data).await {
Ok(()) => {
trace!("⌨️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
},
Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
}
tx.send(Ok(pkt)).await;//.ok(); // best-effort echo
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(4096); // higher burst
let ms = self.ms.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
let mut boot_mode = true;
while let Some(pkt) = s.next().await.transpose()? {
loop {
match ms.lock().await.write_all(&pkt.data).await {
Ok(()) => {
trace!("🖱️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
}
Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
}
let _ = tx.send(Ok(pkt)).await;
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let r = req.into_inner();
// let devs = loop {
// let list = list_gc311_devices()
// .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?;
// if !list.is_empty() { break list; }
// tokio::time::sleep(Duration::from_secs(1)).await;
// };
// let dev = devs
// .get(r.id as usize)
// .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))?
// .to_owned();
let dev = match r.id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
}
.to_string();
info!("🎥 streaming {dev} at ≤{}kb/s", r.max_bitrate);
let s = video::spawn_camera(&dev, r.id, r.max_bitrate)
.await
.map_err(|e| Status::internal(format!("{e:#?}")))?;
Ok(Response::new(Box::pin(s) as _))
}
}
/*─────────────────── main ──────────────────────────────*/
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
/* logging */
let _log_guard: WorkerGuard = init_tracing()?;
/* autocycle task */
// tokio::spawn(async { monitor_gc311_disconnect().await.ok(); });
let gadget = UsbGadget::new("lesavka");
let handler = Handler::make(gadget.clone()).await?;
// tokio::spawn({
// let gadget = gadget.clone();
// async move {
// loop {
// tokio::time::sleep(Duration::from_secs(4)).await;
// if LAST_HID_WRITE.elapsed().as_secs() > 3 {
// warn!("no HID traffic in 3s cycling UDC");
// let _ = gadget.cycle();
// }
// }
// }
// });
println!("🌐 lesavka-server listening on 0.0.0.0:50051");
Server::builder()
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
.await?;
Ok(())
}