lesavka/server/src/main.rs
2025-07-04 03:41:39 -05:00

314 lines
11 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! lesavka-server - **auto-cycle disabled**
// server/src/main.rs
#![forbid(unsafe_code)]
use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc};
use std::sync::atomic::AtomicBool;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::Context as _;
use futures_util::{Stream, StreamExt};
use tokio::{
fs::{OpenOptions},
io::AsyncWriteExt,
sync::Mutex,
};
use gstreamer as gst;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tonic::transport::Server;
use tonic_reflection::server::{Builder as ReflBuilder};
use tracing::{info, warn, error, trace, debug};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking::WorkerGuard;
use lesavka_common::lesavka::{
Empty, ResetUsbReply,
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket, AudioPacket
};
use lesavka_server::{gadget::UsbGadget, video, audio, handshake::HandshakeSvc};
/*──────────────── constants ────────────────*/
/// **false** = never reset automatically.
const AUTO_CYCLE: bool = false;
/*──────────────── logging ───────────────────*/
fn init_tracing() -> anyhow::Result<WorkerGuard> {
let file = std::fs::OpenOptions::new()
.create(true).truncate(true).write(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = tracing_appender::non_blocking(file);
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")
});
let filter_str = env_filter.to_string();
tracing_subscriber::registry()
.with(env_filter)
.with(
fmt::layer()
.with_target(true)
.with_thread_ids(true),
)
.with(
fmt::layer()
.with_writer(file_writer)
.with_ansi(false)
.with_target(true)
.with_level(true),
)
.init();
tracing::info!("📜 effective RUST_LOG = \"{}\"", filter_str);
Ok(guard)
}
/*──────────────── helpers ───────────────────*/
async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
for attempt in 1..=200 { // ≈10s
match OpenOptions::new()
.write(true).custom_flags(libc::O_NONBLOCK).open(path).await
{
Ok(f) => {
info!("✅ {path} opened on attempt #{attempt}");
return Ok(f);
}
Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
trace!("⏳ {path} busy… retry #{attempt}");
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => return Err(e).with_context(|| format!("opening {path}")),
}
}
Err(anyhow::anyhow!("timeout waiting for {path}"))
}
fn next_minute() -> SystemTime {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH).unwrap();
let secs = now.as_secs();
let next = (secs / 60 + 1) * 60;
UNIX_EPOCH + Duration::from_secs(next)
}
/*──────────────── Handler ───────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
gadget: UsbGadget,
did_cycle: AtomicBool,
}
impl Handler {
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
if AUTO_CYCLE {
info!("🛠️ Initial USB reset…");
let _ = gadget.cycle(); // ignore failure - may boot without host
} else {
info!("🛠️ AUTO_CYCLE disabled - no initial reset");
}
info!("🛠️ opening HID endpoints …");
let kb = open_with_retry("/dev/hidg0").await?;
let ms = open_with_retry("/dev/hidg1").await?;
info!("✅ HID endpoints ready");
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: AtomicBool::new(false),
})
}
}
/*──────────────── gRPC service ─────────────*/
#[tonic::async_trait]
impl Relay for Handler {
/* existing streams ─ unchanged, except: no more auto-reset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send>>;
type CaptureAudioStream = Pin<Box<dyn Stream<Item=Result<AudioPacket,Status>> + Send>>;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = kb.lock().await.write_all(&pkt.data).await {
warn!("⌨️ write failed: {e} (dropped)");
}
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let ms = self.ms.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = ms.lock().await.write_all(&pkt.data).await {
warn!("🖱️ write failed: {e} (dropped)");
}
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_microphone(
&self,
req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
// 1 ─ build once, early
let mut sink = audio::Voice::new("hw:UAC2Gadget,0").await
.map_err(|e| Status::internal(format!("{e:#}")))?;
// 2 ─ dummy outbound stream (same trick as before)
let (tx, rx) = tokio::sync::mpsc::channel(1);
// 3 ─ drive the sink in a background task
tokio::spawn(async move {
let mut inbound = req.into_inner();
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
while let Some(pkt) = inbound.next().await.transpose()? {
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
tracing::trace!("🎤⬇ srv pkt#{n} {} bytes", pkt.data.len());
}
sink.push(&pkt);
}
sink.finish(); // flush on EOS
let _ = tx.send(Ok(Empty {})).await;
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_camera(
&self,
req: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
// map gRPC camera id → UVC device
let uvc = std::env::var("LESAVKA_UVC_DEV")
.unwrap_or_else(|_| "/dev/video4".into());
// build once
let relay = video::CameraRelay::new(0, &uvc)
.map_err(|e| Status::internal(format!("{e:#}")))?;
// dummy outbound (same pattern as other streams)
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
relay.feed(pkt); // ← all logging inside video.rs
}
tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
let id = req.into_inner().id;
let dev = match id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
};
debug!("🎥 streaming {dev}");
let s = video::eye_ball(dev, id, 6_000)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))
}
async fn capture_audio(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
// Only one speaker stream for now; both 0/1 → same ALSA dev.
let _id = req.into_inner().id;
// Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging).
let dev = std::env::var("LESAVKA_ALSA_DEV")
.unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let s = audio::ear(&dev, 0)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))
}
/*────────────── USB-reset RPC ────────────*/
async fn reset_usb(
&self,
_req: Request<Empty>,
) -> Result<Response<ResetUsbReply>, Status> {
info!("🔴 explicit ResetUsb() called");
match self.gadget.cycle() {
Ok(_) => Ok(Response::new(ResetUsbReply { ok: true })),
Err(e) => {
error!("💥 cycle failed: {e:#}");
Err(Status::internal(e.to_string()))
}
}
}
}
/*──────────────── main ───────────────────────*/
#[tokio::main(worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
let _guard = init_tracing()?;
panic::set_hook(Box::new(|p| {
let bt = Backtrace::force_capture();
error!("💥 panic: {p}\n{bt}");
}));
let gadget = UsbGadget::new("lesavka");
let handler = Handler::new(gadget.clone()).await?;
info!("🌐 lesavka-server listening on 0.0.0.0:50051");
Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(2*1024*1024))
.add_service(RelayServer::new(handler))
.add_service(HandshakeSvc::server())
.add_service(ReflBuilder::configure().build_v1().unwrap())
.serve(([0,0,0,0], 50051).into())
.await?;
Ok(())
}