lesavka/server/src/main.rs

182 lines
6.2 KiB
Rust
Raw Normal View History

2025-06-23 07:18:26 -05:00
//! lesavka-server — receive HidReport and write to /dev/hidg0
2025-06-16 21:47:01 -05:00
// sever/src/main.rs
2025-06-01 16:04:00 -05:00
#![forbid(unsafe_code)]
2025-06-01 13:31:22 -05:00
2025-06-21 05:21:57 -05:00
use anyhow::Context;
use futures_util::Stream;
use std::{pin::Pin, sync::Arc, time::Duration};
2025-06-12 01:59:17 -05:00
use tokio::{fs::{File, OpenOptions}, io::AsyncWriteExt, sync::Mutex};
2025-06-21 05:21:57 -05:00
use tokio_stream::{wrappers::ReceiverStream};
2025-06-02 20:41:36 -05:00
use tonic::{transport::Server, Request, Response, Status};
2025-06-21 05:21:57 -05:00
use tracing::{error, info, trace, warn};
2025-06-15 22:15:50 -05:00
use tracing_subscriber::{fmt, EnvFilter};
2025-06-21 05:21:57 -05:00
use udev::{Enumerator, MonitorBuilder};
2025-06-23 07:18:26 -05:00
use lesavka_server::{video, usb_reset};
2025-06-01 21:26:57 -05:00
2025-06-23 07:18:26 -05:00
use lesavka_common::lesavka::{
2025-06-01 21:26:57 -05:00
relay_server::{Relay, RelayServer},
2025-06-17 20:54:31 -05:00
KeyboardReport, MouseReport,
2025-06-21 05:21:57 -05:00
MonitorRequest, VideoPacket,
2025-06-01 16:04:00 -05:00
};
2025-06-02 20:24:00 -05:00
2025-06-21 05:21:57 -05:00
/*─────────────────── GC311 discovery ───────────────────*/
fn list_gc311_devices() -> anyhow::Result<Vec<String>> {
let mut v = Vec::new();
for entry in std::fs::read_dir("/sys/class/video4linux")? {
let path = entry?.path();
let name = std::fs::read_to_string(path.join("name"))?;
if name.to_lowercase().contains("gc311") {
v.push(
path.file_name()
.unwrap()
.to_string_lossy()
.replace("video", "/dev/video"),
);
}
}
v.sort();
Ok(v)
}
/// background task: whenever GC311 disappears, cycle USB port
async fn monitor_gc311_disconnect() -> anyhow::Result<()> {
let mut mon = MonitorBuilder::new()?
.match_subsystem("usb")?
.match_property("PRODUCT", "7ca/3311/*")? // vendor: 0x07ca, device 0x3311
.listen()?;
while let Some(ev) = mon.next() {
if ev.event_type() == udev::EventType::Remove {
if let (Some(bus), Some(dev)) = (ev.attribute_value("busnum"), ev.attribute_value("devnum")) {
usb_reset::cycle_port(bus.to_str().unwrap(), dev.to_str().unwrap());
}
}
}
Ok(())
}
/*─────────────────── tonic service ─────────────────────*/
2025-06-02 20:41:36 -05:00
struct Handler {
2025-06-17 20:54:31 -05:00
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
2025-06-01 16:04:00 -05:00
}
2025-06-01 13:31:22 -05:00
2025-06-02 20:41:36 -05:00
#[tonic::async_trait]
2025-06-16 22:14:52 -05:00
impl Relay for Handler {
2025-06-17 08:17:23 -05:00
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
2025-06-21 05:21:57 -05:00
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send + Sync + 'static>>;
2025-06-01 13:31:22 -05:00
2025-06-17 08:17:23 -05:00
async fn stream_keyboard(
2025-06-01 13:31:22 -05:00
&self,
2025-06-17 08:17:23 -05:00
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
2025-06-12 01:57:08 -05:00
let kb = self.kb.clone();
2025-06-02 20:41:36 -05:00
tokio::spawn(async move {
2025-06-17 08:17:23 -05:00
let mut s = req.into_inner();
2025-06-17 20:54:31 -05:00
while let Some(pkt) = s.next().await.transpose()? {
kb.lock().await.write_all(&pkt.data).await?;
tx.send(Ok(pkt)).await;//.ok(); // best-effort echo
2025-06-17 08:17:23 -05:00
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
2025-06-16 19:19:14 -05:00
2025-06-17 08:17:23 -05:00
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
2025-06-17 22:36:23 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(4096); // higher burst
2025-06-17 08:17:23 -05:00
let ms = self.ms.clone();
2025-06-16 19:19:14 -05:00
2025-06-17 08:17:23 -05:00
tokio::spawn(async move {
let mut s = req.into_inner();
2025-06-18 02:06:11 -05:00
let mut boot_mode = true;
2025-06-17 20:54:31 -05:00
while let Some(pkt) = s.next().await.transpose()? {
2025-06-17 23:13:13 -05:00
loop {
match ms.lock().await.write_all(&pkt.data).await {
2025-06-18 02:06:11 -05:00
Ok(()) => {
trace!("🖱️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
tokio::time::sleep(Duration::from_micros(500)).await;
2025-06-17 23:13:13 -05:00
}
Err(e) => return Err(Status::internal(format!("hidg1: {e}"))),
}
}
let _ = tx.send(Ok(pkt)).await;
2025-06-01 13:31:22 -05:00
}
2025-06-17 08:17:23 -05:00
Ok::<(), Status>(())
2025-06-01 13:31:22 -05:00
});
2025-06-17 08:17:23 -05:00
Ok(Response::new(ReceiverStream::new(rx)))
2025-06-01 13:31:22 -05:00
}
2025-06-21 05:21:57 -05:00
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let r = req.into_inner();
let devs = list_gc311_devices()
.map_err(|e| Status::internal(format!("enum v4l2: {e}")))?;
let dev = devs
.get(r.id as usize)
.ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))?
.to_owned();
info!("🎥 streaming {dev} at ≤{}kb/s", r.max_bitrate);
let s = video::spawn_camera(&dev, r.id, r.max_bitrate)
.await
.map_err(|e| Status::internal(format!("{e:#?}")))?;
Ok(Response::new(Box::pin(s) as _))
}
2025-06-01 13:31:22 -05:00
}
2025-06-21 05:21:57 -05:00
/*─────────────────── main ──────────────────────────────*/
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
2025-06-02 20:41:36 -05:00
async fn main() -> anyhow::Result<()> {
2025-06-21 05:21:57 -05:00
/* logging */
2025-06-16 00:10:03 -05:00
fmt().with_env_filter(
2025-06-21 05:21:57 -05:00
EnvFilter::try_from_default_env()
2025-06-23 07:18:26 -05:00
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info")),
2025-06-16 00:05:39 -05:00
)
.init();
2025-06-05 22:44:27 -05:00
2025-06-21 05:21:57 -05:00
/* autocycle task */
tokio::spawn(async { monitor_gc311_disconnect().await.ok(); });
2025-06-12 01:57:08 -05:00
let kb = OpenOptions::new()
2025-06-02 20:41:36 -05:00
.write(true)
.open("/dev/hidg0")
.await?;
2025-06-12 01:57:08 -05:00
let ms = OpenOptions::new()
.write(true)
2025-06-18 02:06:11 -05:00
.custom_flags(libc::O_NONBLOCK)
2025-06-12 01:57:08 -05:00
.open("/dev/hidg1")
.await?;
let handler = Handler {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
};
2025-06-01 16:04:00 -05:00
2025-06-23 07:18:26 -05:00
println!("🌐 lesavka-server listening on 0.0.0.0:50051");
2025-06-01 13:31:22 -05:00
Server::builder()
2025-06-02 20:41:36 -05:00
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
2025-06-01 13:31:22 -05:00
.await?;
Ok(())
}