lesavka/server/src/main.rs

602 lines
22 KiB
Rust

// lesavka-server - gadget cycle guarded by env
// server/src/main.rs
#[allow(clippy::useless_attribute)]
#[forbid(unsafe_code)]
use futures_util::{Stream, StreamExt};
use std::sync::atomic::AtomicBool;
use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use tonic_reflection::server::Builder as ReflBuilder;
use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::{
AudioPacket, CapturePowerCommand, CapturePowerState, Empty, KeyboardReport, MonitorRequest,
MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, VideoPacket,
relay_server::{Relay, RelayServer},
};
use lesavka_server::{
audio, camera, camera_runtime::CameraRuntime, capture_power::CapturePowerManager,
gadget::UsbGadget, handshake::HandshakeSvc, paste, runtime_support,
runtime_support::init_tracing, uvc_runtime, video,
};
/*──────────────── constants ────────────────*/
const PKG_NAME: &str = env!("CARGO_PKG_NAME");
type VideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send>>;
type AudioStream = Pin<Box<dyn Stream<Item = Result<AudioPacket, Status>> + Send>>;
fn hid_endpoint(index: u8) -> String {
std::env::var("LESAVKA_HID_DIR")
.map(|dir| format!("{dir}/hidg{index}"))
.unwrap_or_else(|_| format!("/dev/hidg{index}"))
}
fn live_keyboard_report_delay() -> Duration {
std::env::var("LESAVKA_LIVE_KEYBOARD_REPORT_DELAY_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(8))
}
/*──────────────── Handler ───────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
gadget: UsbGadget,
did_cycle: Arc<AtomicBool>,
camera_rt: Arc<CameraRuntime>,
capture_power: CapturePowerManager,
}
impl Handler {
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
#[cfg(not(coverage))]
if runtime_support::allow_gadget_cycle() {
info!("🛠️ Initial USB reset…");
let _ = gadget.cycle(); // ignore failure - may boot without host
}
#[cfg(not(coverage))]
{
if !runtime_support::allow_gadget_cycle() {
info!(
"🔒 gadget cycle disabled at startup (set LESAVKA_ALLOW_GADGET_CYCLE=1 to enable)"
);
}
info!("🛠️ opening HID endpoints …");
}
let kb = runtime_support::open_with_retry(&hid_endpoint(0)).await?;
let ms = runtime_support::open_with_retry(&hid_endpoint(1)).await?;
#[cfg(not(coverage))]
info!("✅ HID endpoints ready");
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
did_cycle: Arc::new(AtomicBool::new(false)),
camera_rt: Arc::new(CameraRuntime::new()),
capture_power: CapturePowerManager::new(),
})
}
async fn reopen_hid(&self) -> anyhow::Result<()> {
let kb_new = runtime_support::open_with_retry(&hid_endpoint(0)).await?;
let ms_new = runtime_support::open_with_retry(&hid_endpoint(1)).await?;
*self.kb.lock().await = kb_new;
*self.ms.lock().await = ms_new;
Ok(())
}
async fn capture_video_reply(
&self,
req: MonitorRequest,
) -> Result<Response<VideoStream>, Status> {
let id = req.id;
let dev = match id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")),
};
#[cfg(not(coverage))]
{
let rpc_id = runtime_support::next_stream_id();
info!(
rpc_id,
id,
max_bitrate = req.max_bitrate,
requested_width = req.requested_width,
requested_height = req.requested_height,
requested_fps = req.requested_fps,
"🎥 capture_video opened"
);
debug!(rpc_id, "🎥 streaming {dev}");
}
let lease = self.capture_power.acquire().await;
let stream = video::eye_ball_with_request(
dev,
id,
req.max_bitrate,
req.requested_width,
req.requested_height,
req.requested_fps,
)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(GuardedVideoStream {
inner: stream,
_lease: lease,
})))
}
async fn paste_text_reply(
&self,
req: Request<PasteRequest>,
) -> Result<Response<PasteReply>, Status> {
let req = req.into_inner();
let text = paste::decrypt(&req).map_err(|e| Status::unauthenticated(format!("{e}")))?;
if let Err(e) = paste::type_text(self.kb.as_ref(), &text).await {
return Ok(Response::new(PasteReply {
ok: false,
error: format!("{e}"),
}));
}
Ok(Response::new(PasteReply {
ok: true,
error: String::new(),
}))
}
async fn reset_usb_reply(&self) -> Result<Response<ResetUsbReply>, Status> {
#[cfg(not(coverage))]
info!("🔴 explicit ResetUsb() called");
match self.gadget.cycle() {
Ok(_) => {
if let Err(e) = self.reopen_hid().await {
#[cfg(not(coverage))]
error!("💥 reopen HID failed: {e:#}");
return Err(Status::internal(e.to_string()));
}
Ok(Response::new(ResetUsbReply { ok: true }))
}
Err(e) => {
#[cfg(not(coverage))]
error!("💥 cycle failed: {e:#}");
Err(Status::internal(e.to_string()))
}
}
}
async fn get_capture_power_reply(&self) -> Result<Response<CapturePowerState>, Status> {
self.capture_power
.snapshot()
.await
.map(Response::new)
.map_err(|e| Status::internal(format!("{e:#}")))
}
async fn set_capture_power_reply(
&self,
req: Request<SetCapturePowerRequest>,
) -> Result<Response<CapturePowerState>, Status> {
let req = req.into_inner();
let result = match CapturePowerCommand::try_from(req.command)
.unwrap_or(CapturePowerCommand::Unspecified)
{
CapturePowerCommand::Auto => self.capture_power.set_auto().await,
CapturePowerCommand::ForceOn => self.capture_power.set_manual(true).await,
CapturePowerCommand::ForceOff => self.capture_power.set_manual(false).await,
CapturePowerCommand::Unspecified => self.capture_power.set_manual(req.enabled).await,
};
result
.map(Response::new)
.map_err(|e| Status::internal(format!("{e:#}")))
}
}
struct GuardedVideoStream<S> {
inner: S,
_lease: lesavka_server::capture_power::CapturePowerLease,
}
impl<S> Stream for GuardedVideoStream<S>
where
S: Stream<Item = Result<VideoPacket, Status>> + Unpin,
{
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
/*──────────────── gRPC service ─────────────*/
#[cfg(not(coverage))]
#[tonic::async_trait]
impl Relay for Handler {
/* existing streams ─ unchanged, except: no more auto-reset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = VideoStream;
type CaptureAudioStream = AudioStream;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
let rpc_id = runtime_support::next_stream_id();
info!(rpc_id, "⌨️ stream_keyboard opened");
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
let ms = self.ms.clone();
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
let session_lease = self.capture_power.acquire_session().await;
let report_delay = live_keyboard_report_delay();
tokio::spawn(async move {
let _session_lease = session_lease;
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = runtime_support::write_hid_report(&kb, &pkt.data).await {
if e.raw_os_error() == Some(libc::EAGAIN) {
debug!(rpc_id, "⌨️ write would block (dropped)");
} else {
warn!(rpc_id, "⌨️ write failed: {e} (dropped)");
runtime_support::recover_hid_if_needed(
&e,
gadget.clone(),
kb.clone(),
ms.clone(),
did_cycle.clone(),
)
.await;
}
}
tx.send(Ok(pkt)).await.ok();
if !report_delay.is_zero() {
tokio::time::sleep(report_delay).await;
}
}
info!(rpc_id, "⌨️ stream_keyboard closed");
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let rpc_id = runtime_support::next_stream_id();
info!(rpc_id, "🖱️ stream_mouse opened");
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let ms = self.ms.clone();
let kb = self.kb.clone();
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
let session_lease = self.capture_power.acquire_session().await;
tokio::spawn(async move {
let _session_lease = session_lease;
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = runtime_support::write_hid_report(&ms, &pkt.data).await {
if e.raw_os_error() == Some(libc::EAGAIN) {
debug!(rpc_id, "🖱️ write would block (dropped)");
} else {
warn!(rpc_id, "🖱️ write failed: {e} (dropped)");
runtime_support::recover_hid_if_needed(
&e,
gadget.clone(),
kb.clone(),
ms.clone(),
did_cycle.clone(),
)
.await;
}
}
tx.send(Ok(pkt)).await.ok();
}
info!(rpc_id, "🖱️ stream_mouse closed");
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_microphone(
&self,
req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
let rpc_id = runtime_support::next_stream_id();
info!(rpc_id, "🎤 stream_microphone opened");
// 1 ─ build once, early
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
info!(%uac_dev, "🎤 stream_microphone using UAC sink");
let mut sink = runtime_support::open_voice_with_retry(&uac_dev)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
// 2 ─ dummy outbound stream (same trick as before)
let (tx, rx) = tokio::sync::mpsc::channel(1);
// 3 ─ drive the sink in a background task
tokio::spawn(async move {
let mut inbound = req.into_inner();
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
while let Some(pkt) = inbound.next().await.transpose()? {
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
tracing::trace!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len());
}
sink.push(&pkt);
}
sink.finish(); // flush on EOS
let _ = tx.send(Ok(Empty {})).await;
info!(rpc_id, "🎤 stream_microphone closed");
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_camera(
&self,
req: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
let rpc_id = runtime_support::next_stream_id();
let cfg = camera::current_camera_config();
info!(
rpc_id,
output = cfg.output.as_str(),
codec = cfg.codec.as_str(),
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
hdmi = cfg.hdmi.as_ref().map(|h| h.name.as_str()).unwrap_or("none"),
"🎥 stream_camera output selected"
);
let (session_id, relay) = self.camera_rt.activate(&cfg).await?;
let camera_rt = self.camera_rt.clone();
info!(rpc_id, session_id, "🎥 stream_camera opened");
// dummy outbound (same pattern as other streams)
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if !camera_rt.is_active(session_id) {
info!(rpc_id, session_id, "🎥 stream_camera session superseded");
break;
}
relay.feed(pkt); // ← all logging inside video.rs
}
tx.send(Ok(Empty {})).await.ok();
info!(rpc_id, session_id, "🎥 stream_camera closed");
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
self.capture_video_reply(req.into_inner()).await
}
async fn capture_audio(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
let rpc_id = runtime_support::next_stream_id();
// Only one speaker stream for now; both 0/1 → same ALSA dev.
let _id = req.into_inner().id;
// Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging).
let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
info!(rpc_id, %dev, "🔊 capture_audio opened");
let s = audio::ear(&dev, 0)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))
}
async fn paste_text(&self, req: Request<PasteRequest>) -> Result<Response<PasteReply>, Status> {
self.paste_text_reply(req).await
}
/*────────────── USB-reset RPC ────────────*/
async fn reset_usb(&self, _req: Request<Empty>) -> Result<Response<ResetUsbReply>, Status> {
self.reset_usb_reply().await
}
async fn get_capture_power(
&self,
_req: Request<Empty>,
) -> Result<Response<CapturePowerState>, Status> {
self.get_capture_power_reply().await
}
async fn set_capture_power(
&self,
req: Request<SetCapturePowerRequest>,
) -> Result<Response<CapturePowerState>, Status> {
self.set_capture_power_reply(req).await
}
}
#[cfg(coverage)]
#[tonic::async_trait]
impl Relay for Handler {
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = VideoStream;
type CaptureAudioStream = AudioStream;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
let report_delay = live_keyboard_report_delay();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
let _ = runtime_support::write_hid_report(&kb, &pkt.data).await;
tx.send(Ok(pkt)).await.ok();
if !report_delay.is_zero() {
tokio::time::sleep(report_delay).await;
}
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let ms = self.ms.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
let _ = runtime_support::write_hid_report(&ms, &pkt.data).await;
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_microphone(
&self,
_req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
Err(Status::internal(
"microphone sink unavailable in coverage harness",
))
}
async fn stream_camera(
&self,
_req: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
Err(Status::internal(
"camera stream unavailable in coverage harness",
))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
self.capture_video_reply(req.into_inner()).await
}
async fn capture_audio(
&self,
_req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
Err(Status::internal(
"audio capture unavailable in coverage harness",
))
}
async fn paste_text(&self, req: Request<PasteRequest>) -> Result<Response<PasteReply>, Status> {
self.paste_text_reply(req).await
}
async fn reset_usb(&self, _req: Request<Empty>) -> Result<Response<ResetUsbReply>, Status> {
self.reset_usb_reply().await
}
async fn get_capture_power(
&self,
_req: Request<Empty>,
) -> Result<Response<CapturePowerState>, Status> {
self.get_capture_power_reply().await
}
async fn set_capture_power(
&self,
req: Request<SetCapturePowerRequest>,
) -> Result<Response<CapturePowerState>, Status> {
self.set_capture_power_reply(req).await
}
}
/*──────────────── main ───────────────────────*/
#[cfg(not(coverage))]
#[tokio::main(worker_threads = 4)]
async fn main() -> anyhow::Result<()> {
let _guard = init_tracing()?;
info!("🚀 {} v{} starting up", PKG_NAME, lesavka_server::VERSION);
panic::set_hook(Box::new(|p| {
let bt = Backtrace::force_capture();
error!("💥 panic: {p}\n{bt}");
}));
let gadget = UsbGadget::new("lesavka");
if std::env::var("LESAVKA_DISABLE_UVC").is_err() {
if std::env::var("LESAVKA_UVC_EXTERNAL").is_ok() {
info!("📷 UVC control helper external; not spawning");
} else {
let bin = uvc_runtime::uvc_ctrl_bin();
tokio::spawn(uvc_runtime::supervise_uvc_control(bin));
}
} else {
info!("📷 UVC disabled (LESAVKA_DISABLE_UVC set)");
}
let handler = Handler::new(gadget.clone()).await?;
info!("🌐 lesavka-server listening on 0.0.0.0:50051");
Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(2 * 1024 * 1024))
.add_service(RelayServer::new(handler))
.add_service(HandshakeSvc::server())
.add_service(ReflBuilder::configure().build_v1().unwrap())
.serve(([0, 0, 0, 0], 50051).into())
.await?;
Ok(())
}
#[cfg(coverage)]
#[tokio::main(worker_threads = 2)]
async fn main() -> anyhow::Result<()> {
let gadget = UsbGadget::new("lesavka");
let _handler = Handler::new(gadget).await?;
Err(anyhow::anyhow!("coverage mode skips live gRPC serve loop"))
}