camera core add

This commit is contained in:
Brad Stein 2025-07-03 15:22:30 -05:00
parent 6c3942cd2c
commit 2fdfa7e597
4 changed files with 117 additions and 15 deletions

View File

@ -21,6 +21,7 @@ use lesavka_common::lesavka::{
use crate::{input::inputs::InputAggregator, use crate::{input::inputs::InputAggregator,
input::microphone::MicrophoneCapture, input::microphone::MicrophoneCapture,
input::camera::CameraCapture,
output::video::MonitorWindow, output::video::MonitorWindow,
output::audio::AudioOut}; output::audio::AudioOut};
@ -46,6 +47,14 @@ impl LesavkaClientApp {
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?; // grab devices immediately agg.init()?; // grab devices immediately
let cam = if std::env::var("LESAVKA_CAM_DISABLE").is_ok() {
None
} else {
Some(Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()
)?))
};
Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx })
} }
@ -134,6 +143,14 @@ impl LesavkaClientApp {
let ep_mic = vid_ep.clone(); let ep_mic = vid_ep.clone();
tokio::spawn(Self::mic_loop(ep_mic, mic)); tokio::spawn(Self::mic_loop(ep_mic, mic));
/*────────── webcam gRPC pusher ───────────────*/
if !std::env::var("LESAVKA_CAM_DISABLE").is_ok() {
let cam = Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()
)?);
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam));
}
/*────────── central reactor ───────────────────*/ /*────────── central reactor ───────────────────*/
tokio::select! { tokio::select! {
_ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); },
@ -295,5 +312,39 @@ impl LesavkaClientApp {
let _ = stop_tx.send(()); let _ = stop_tx.send(());
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
} }
} }
/*──────────────── cam stream ───────────────────*/
async fn cam_loop(ep: Channel, cam: Arc<CameraCapture>) {
loop {
let mut cli = RelayClient::new(ep.clone());
// pull frames in a real thread so we dont block async scheduler
let (tx_pkt, rx_pkt) = tokio::sync::mpsc::channel::<VideoPacket>(256);
std::thread::spawn({
let cam = cam.clone();
move || {
while let Some(pkt) = cam.pull() {
// TRACE every 120 frames only
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
let _ = tx_pkt.blocking_send(pkt);
}
}
});
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx_pkt);
match cli.stream_camera(Request::new(outbound)).await {
Ok(mut resp) => {
while resp.get_mut().message().await.transpose().is_some() {}
}
Err(e) => tracing::warn!("❌📸 connect failed: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
} }

View File

@ -22,16 +22,30 @@ impl CameraCapture {
.and_then(Self::find_device) .and_then(Self::find_device)
.unwrap_or_else(|| "/dev/video0".into()); .unwrap_or_else(|| "/dev/video0".into());
let desc = format!( // let (enc, raw_caps) = Self::pick_encoder();
"v4l2src device={dev} io-mode=dmabuf do-timestamp=true ! \ // (NVIDIA → VA-API → software x264).
videoconvert ! videoscale ! video/x-raw,width=1280,height=720 ! \ let (enc, kf_prop, kf_val) = Self::choose_encoder();
v4l2h264enc key-int-max=30 \ tracing::info!("📸 using encoder element: {enc}");
extra-controls=\"encode,frame_level_rate_control_enable=1,h264_profile=4\" ! \
h264parse config-interval=-1 ! \ // let desc = format!(
appsink name=asink emit-signals=true max-buffers=60 drop=true" // "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \
); // videoconvert ! {enc} key-int-max=30 ! \
// h264parse config-interval=-1 ! \
// appsink name=asink emit-signals=true max-buffers=60 drop=true"
// );
// tracing::debug!(%desc, "📸 pipeline-desc");
// Build a pipeline that works for any of the three encoders.
// * nvh264enc needs NVMM memory caps;
// * vaapih264enc wants system-memory caps;
// * x264enc needs the usual raw caps.
let desc = format!(
"v4l2src device={dev} do-timestamp=true ! \
videoconvert ! {enc} {kf_prop}={kf_val} ! \
h264parse config-interval=-1 ! \
appsink name=asink emit-signals=true max-buffers=60 drop=true"
);
tracing::info!(%enc, ?desc, "📸 using encoder element");
tracing::debug!("📸 pipelinedesc:\n{desc}");
let pipeline: gst::Pipeline = gst::parse::launch(&desc) let pipeline: gst::Pipeline = gst::parse::launch(&desc)
.context("gst parse_launch(cam)")? .context("gst parse_launch(cam)")?
.downcast::<gst::Pipeline>() .downcast::<gst::Pipeline>()
@ -55,7 +69,7 @@ impl CameraCapture {
let buf = sample.buffer()?; let buf = sample.buffer()?;
let map = buf.map_readable().ok()?; let map = buf.map_readable().ok()?;
let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
Some(VideoPacket { id: 100, pts, data: map.as_slice().to_vec() }) Some(VideoPacket { id: 0, pts, data: map.as_slice().to_vec() })
} }
/// Fuzzymatch devices under `/dev/v4l/by-id` /// Fuzzymatch devices under `/dev/v4l/by-id`
@ -82,4 +96,32 @@ impl CameraCapture {
.unwrap(); .unwrap();
Self { pipeline, sink } Self { pipeline, sink }
} }
fn pick_encoder() -> (&'static str, &'static str) {
let encoders = &[
("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"),
("vaapih264enc","video/x-raw,format=NV12"),
("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc.
("x264enc", "video/x-raw"), // software
];
for (name, caps) in encoders {
if gst::ElementFactory::find(name).is_some() {
return (name, caps);
}
}
// last resort software
("x264enc", "video/x-raw")
}
/// Return the encoder element *and* the property/key-frame pair we must
/// set to get an IDR every 30 frames.
fn choose_encoder() -> (&'static str, &'static str, &'static str) {
if gst::ElementFactory::find("nvh264enc").is_some() {
("nvh264enc", "gop-size", "30") // NVIDIA NVENC uses `gop-size` :contentReference[oaicite:0]{index=0}
} else if gst::ElementFactory::find("vaapih264enc").is_some() {
("vaapih264enc", "keyframe-period", "30")// VA-API uses `keyframe-period` :contentReference[oaicite:1]{index=1}
} else {
("x264enc", "key-int-max", "30") // libx264 fallback :contentReference[oaicite:2]{index=2}
}
}
} }

View File

@ -3,7 +3,7 @@
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use evdev::{Device, EventType, KeyCode, RelativeAxisCode}; use evdev::{Device, EventType, KeyCode, RelativeAxisCode};
use tokio::{sync::broadcast::Sender, time::{interval, Duration}}; use tokio::{sync::broadcast::Sender, time::{interval, Duration}};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn, trace};
use lesavka_common::lesavka::{KeyboardReport, MouseReport}; use lesavka_common::lesavka::{KeyboardReport, MouseReport};
@ -102,7 +102,11 @@ impl InputAggregator {
Some(c) Some(c)
} }
Some(Err(e)) => { Some(Err(e)) => {
warn!("📸 webcam disabled: {e:#}"); warn!(
"📸 webcam disabled {:?}. \
Hint: install gstpluginsbad or provide LESAVKA_CAM_DISABLE=1",
e.root_cause()
);
Some(CameraCapture::new_stub()) // keep stub so callsites compile Some(CameraCapture::new_stub()) // keep stub so callsites compile
} }
None => { None => {
@ -149,7 +153,7 @@ impl InputAggregator {
} }
if let Some(cam) = &self.camera { if let Some(cam) = &self.camera {
debug!("📸 CameraCapture present first pull() will block until a frame arrives"); trace!("📸 CameraCapture present first pull() will block until a frame arrives");
} else { } else {
debug!("📸 No camera pipeline active"); debug!("📸 No camera pipeline active");
} }

View File

@ -282,13 +282,18 @@ impl CameraRelay {
/// Push one VideoPacket coming from the client /// Push one VideoPacket coming from the client
pub fn feed(&self, pkt: VideoPacket) { pub fn feed(&self, pkt: VideoPacket) {
let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 { if n < 10 || n % 60 == 0 {
tracing::debug!(target:"lesavka_server::video", tracing::debug!(target:"lesavka_server::video",
cam_id = self.id, cam_id = self.id,
frame = n, frame = n,
bytes = pkt.data.len(), bytes = pkt.data.len(),
pts = pkt.pts, pts = pkt.pts,
"📸 srv webcam frame"); "📸 srv webcam frame");
} else if n % 10 == 0 {
tracing::trace!(target:"lesavka_server::video",
cam_id = self.id,
bytes = pkt.data.len(),
"📥 srv pkt");
} }
self.sink.push(pkt); self.sink.push(pkt);
} }