more logging

This commit is contained in:
Brad Stein 2025-07-04 01:56:59 -05:00
parent 9fc17478b2
commit 84790ea470
9 changed files with 140 additions and 76 deletions

View File

@ -19,7 +19,8 @@ use lesavka_common::lesavka::{
MonitorRequest, MouseReport, VideoPacket, AudioPacket
};
use crate::{input::inputs::InputAggregator,
use crate::{handshake,
input::inputs::InputAggregator,
input::microphone::MicrophoneCapture,
input::camera::CameraCapture,
output::video::MonitorWindow,
@ -47,18 +48,14 @@ impl LesavkaClientApp {
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?; // grab devices immediately
let cam = if std::env::var("LESAVKA_CAM_DISABLE").is_ok() {
None
} else {
Some(Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()
)?))
};
Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx })
}
pub async fn run(&mut self) -> Result<()> {
/*────────── handshake / feature-negotiation ───────────────*/
let caps = handshake::negotiate(&self.server_addr).await;
tracing::info!("🤝 server capabilities = {:?}", caps);
/*────────── persistent gRPC channels ──────────*/
let hid_ep = Channel::from_shared(self.server_addr.clone())?
.tcp_nodelay(true)
@ -137,13 +134,17 @@ impl LesavkaClientApp {
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(ep_audio, audio_out));
/*────────── camera & mic tasks ───────────────*/
let cam = Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()
)?);
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam));
/*────────── camera & mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() {
let cam = Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()
)?);
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam));
if caps.microphone {
let mic = Arc::new(MicrophoneCapture::new()?);
tokio::spawn(Self::mic_loop(vid_ep.clone(), mic));
}
}
let mic = Arc::new(MicrophoneCapture::new()?);
tokio::spawn(Self::mic_loop(vid_ep.clone(), mic));
@ -269,6 +270,7 @@ impl LesavkaClientApp {
/*──────────────── mic stream ─────────────────*/
async fn mic_loop(ep: Channel, mic: Arc<MicrophoneCapture>) {
let mut delay = Duration::from_secs(1);
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let mut cli = RelayClient::new(ep.clone());
@ -290,12 +292,11 @@ impl LesavkaClientApp {
// 3. turn `rx` into an async stream for gRPC
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => {
while resp.get_mut().message().await.transpose().is_some() {}
}
Err(e) => {
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => {
while resp.get_mut().message().await.transpose().is_some() {}
}
Err(e) => {
// first failure → warn, subsequent ones → debug
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌🎤 connect failed: {e}");
@ -303,15 +304,17 @@ impl LesavkaClientApp {
} else {
debug!("❌🎤 reconnect failed: {e}");
}
delay = next_delay(delay);
}
}
let _ = stop_tx.send(());
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(delay).await;
}
}
/*──────────────── cam stream ───────────────────*/
async fn cam_loop(ep: Channel, cam: Arc<CameraCapture>) {
let mut delay = Duration::from_secs(1);
loop {
let mut cli = RelayClient::new(ep.clone());
let (tx, rx) = tokio::sync::mpsc::channel::<VideoPacket>(256);
@ -327,16 +330,33 @@ impl LesavkaClientApp {
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
let _ = tx.blocking_send(pkt);
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B",
pkt.pts, pkt.data.len());
let _ = tx.try_send(pkt);
}
}
});
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
if let Err(e) = cli.stream_camera(Request::new(outbound)).await {
tracing::warn!("❌📸 connect failed: {e}");
match cli.stream_camera(Request::new(outbound)).await {
Ok(_) => delay = Duration::from_secs(1), // got a stream → reset
Err(e) if e.code() == tonic::Code::Unimplemented => {
tracing::warn!("📸 server does not support StreamCamera giving up");
return; // stop the task completely (#3)
}
Err(e) => {
tracing::warn!("❌📸 connect failed: {e:?}");
delay = next_delay(delay); // back-off (#2)
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(delay).await;
}
}
}
fn next_delay(cur: std::time::Duration) -> std::time::Duration {
match cur.as_secs() {
1..=15 => cur * 2,
_ => std::time::Duration::from_secs(30),
}
}

29
client/src/handshake.rs Normal file
View File

@ -0,0 +1,29 @@
// client/src/handshake.rs
#![forbid(unsafe_code)]
use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient};
use tonic::Code;
#[derive(Default, Clone, Copy, Debug)]
pub struct PeerCaps {
pub camera: bool,
pub microphone: bool,
}
pub async fn negotiate(uri: &str) -> PeerCaps {
let mut cli = HandshakeClient::connect(uri.to_owned())
.await
.expect("\"dial handshake\"");
match cli.get_capabilities(pb::Empty {}).await {
Ok(rsp) => PeerCaps {
camera: rsp.get_ref().camera,
microphone: rsp.get_ref().microphone,
},
Err(e) if e.code() == Code::Unimplemented => {
// ↺ old server pretend it supports nothing special.
PeerCaps::default()
}
Err(e) => panic!("\"handshake failed: {e}\""),
}
}

View File

@ -26,6 +26,17 @@ impl CameraCapture {
// (NVIDIA → VA-API → software x264).
let (enc, kf_prop, kf_val) = Self::choose_encoder();
tracing::info!("📸 using encoder element: {enc}");
let (src_caps, preenc) = match enc {
"nvh264enc" => (
"video/x-raw(memory:NVMM),format=NV12,width=1280,height=720",
"nvvidconv !"
),
"vaapih264enc" => (
"video/x-raw,format=NV12,width=1280,height=720",
"videoconvert !"
),
_ => ("video/x-raw,width=1280,height=720", "videoconvert !"),
};
// let desc = format!(
// "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \
@ -39,9 +50,10 @@ impl CameraCapture {
// * vaapih264enc wants system-memory caps;
// * x264enc needs the usual raw caps.
let desc = format!(
"v4l2src device={dev} do-timestamp=true ! \
videoconvert ! {enc} {kf_prop}={kf_val} ! \
h264parse config-interval=-1 ! \
"v4l2src device={dev} do-timestamp=true ! {src_caps} ! \
{preenc} {enc} {kf_prop}={kf_val} ! \
h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=30 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
);
tracing::info!(%enc, ?desc, "📸 using encoder element");
@ -97,6 +109,7 @@ impl CameraCapture {
Self { pipeline, sink }
}
#[allow(dead_code)] // helper kept for future heuristics
fn pick_encoder() -> (&'static str, &'static str) {
let encoders = &[
("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"),
@ -113,15 +126,15 @@ impl CameraCapture {
("x264enc", "video/x-raw")
}
/// Return the encoder element *and* the property/key-frame pair we must
/// set to get an IDR every 30 frames.
fn choose_encoder() -> (&'static str, &'static str, &'static str) {
if gst::ElementFactory::find("nvh264enc").is_some() {
("nvh264enc", "gop-size", "30") // NVIDIA NVENC uses `gop-size` :contentReference[oaicite:0]{index=0}
} else if gst::ElementFactory::find("vaapih264enc").is_some() {
("vaapih264enc", "keyframe-period", "30")// VA-API uses `keyframe-period` :contentReference[oaicite:1]{index=1}
} else {
("x264enc", "key-int-max", "30") // libx264 fallback :contentReference[oaicite:2]{index=2}
match () {
_ if gst::ElementFactory::find("nvh264enc").is_some() =>
("nvh264enc", "gop-size", "30"),
_ if gst::ElementFactory::find("vaapih264enc").is_some() =>
("vaapih264enc","keyframe-period","30"),
_ if gst::ElementFactory::find("v4l2h264enc").is_some() =>
("v4l2h264enc","idrcount", "30"),
_ => ("x264enc", "key-int-max", "30"),
}
}
}

View File

@ -7,7 +7,7 @@ use tracing::{debug, info, warn, trace};
use lesavka_common::lesavka::{KeyboardReport, MouseReport};
use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator, camera::CameraCapture};
use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator};
use crate::layout::{Layout, apply as apply_layout};
pub struct InputAggregator {
@ -18,7 +18,6 @@ pub struct InputAggregator {
magic_active: bool,
keyboards: Vec<KeyboardAggregator>,
mice: Vec<MouseAggregator>,
camera: Option<CameraCapture>,
}
impl InputAggregator {
@ -26,8 +25,8 @@ impl InputAggregator {
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>) -> Self {
Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false,
keyboards: Vec::new(), mice: Vec::new(),
camera: None }
keyboards: Vec::new(), mice: Vec::new()
}
}
/// Called once at startup: enumerates input devices,
@ -92,29 +91,6 @@ impl InputAggregator {
bail!("No suitable keyboard/mouse devices found or none grabbed.");
}
// Autoselect webcam (may be toggled later through magic chord)
let cam_src_env = std::env::var("LESAVKA_CAM_SOURCE").ok();
self.camera = match cam_src_env.as_deref()
// .or(device_fragment) // <- if you later wire CLI arg
.map(|s| CameraCapture::new(Some(s))) {
Some(Ok(c)) => {
info!("📸 webcam enabled (device fragment = {:?})", cam_src_env);
Some(c)
}
Some(Err(e)) => {
warn!(
"📸 webcam disabled {:?}. \
Hint: install gstpluginsbad or provide LESAVKA_CAM_DISABLE=1",
e.root_cause()
);
Some(CameraCapture::new_stub()) // keep stub so callsites compile
}
None => {
info!("📸 webcam disabled (no CAM_SOURCE set)");
None // or Stub your choice
}
};
Ok(())
}
@ -152,12 +128,6 @@ impl InputAggregator {
mouse.process_events();
}
if let Some(cam) = &self.camera {
trace!("📸 CameraCapture present first pull() will block until a frame arrives");
} else {
debug!("📸 No camera pipeline active");
}
self.magic_active = magic_now;
tick.tick().await;
}

View File

@ -30,12 +30,24 @@ impl MicrophoneCapture {
_ => String::new(),
};
debug!("🎤 device: {device_arg}");
let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"]
.into_iter()
.find(|e| gst::ElementFactory::find(e).is_some())
.unwrap_or("opusenc");
let parser = if aac.contains("opus") {
// opusenc already outputs raw Opus frames just state the caps
"capsfilter caps=audio/x-opus,rate=48000,channels=2"
} else {
// AAC → ADTS frames
"aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2"
};
let desc = format!(
"pulsesrc {device_arg} do-timestamp=true ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
audioconvert ! audioresample ! avenc_aac bitrate=128000 ! \
aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! \
appsink name=asink emit-signals=true max-buffers=50 drop=true"
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
audioconvert ! audioresample ! {aac} bitrate=128000 ! \
{parser} ! \
queue max-size-buffers=100 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=50 drop=true"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?

View File

@ -6,5 +6,6 @@ pub mod app;
pub mod input;
pub mod output;
pub mod layout;
pub mod handshake;
pub use app::LesavkaClientApp;

View File

@ -1,3 +1,4 @@
// proto/lesavka.proto
syntax = "proto3";
package lesavka;
@ -10,6 +11,8 @@ message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; }
message ResetUsbReply { bool ok = 1; } // true = success
message HandshakeSet { bool camera = 1; bool microphone = 2; }
message Empty {}
service Relay {
@ -22,3 +25,7 @@ service Relay {
rpc ResetUsb (Empty) returns (ResetUsbReply);
}
service Handshake {
rpc GetCapabilities (Empty) returns (HandshakeSet);
}

View File

@ -122,7 +122,7 @@ After=network.target lesavka-core.service
[Service]
ExecStart=/usr/local/bin/lesavka-server
Restart=always
Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=trace,lesavka_server::gadget=info
Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=trace,lesavka_server::video=trace,lesavka_server::gadget=info
Environment=RUST_BACKTRACE=1
Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6"
Restart=always

View File

@ -295,6 +295,18 @@ impl CameraRelay {
bytes = pkt.data.len(),
"📸📥 srv pkt");
}
if cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE) {
if n % 120 == 0 {
let path = format!("/tmp/eye3-cli-{n:05}.h264");
if let Err(e) = std::fs::write(&path, &pkt.data) {
tracing::warn!("📸💾 dump failed: {e}");
} else {
tracing::debug!("📸💾 wrote {}", path);
}
}
}
self.sink.push(pkt);
}
}