diff --git a/client/src/app.rs b/client/src/app.rs index 68b5326..0dcef32 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -5,7 +5,7 @@ use std::time::Duration; use tokio::sync::broadcast; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tonic::{transport::Channel, Request}; -use tracing::{error, info, warn}; +use tracing::{error, trace, debug, info, warn}; use winit::{ event::Event, event_loop::{EventLoopBuilder, ControlFlow}, @@ -203,23 +203,23 @@ impl LesavkaClientApp { let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { - tracing::info!("πŸŽ₯ cli video{monitor_id}: stream opened"); + info!("πŸŽ₯ cli video{monitor_id}: stream opened"); while let Some(res) = stream.get_mut().message().await.transpose() { match res { Ok(pkt) => { - tracing::debug!("πŸŽ₯ cli video{monitor_id}: got {}β€―bytes", pkt.data.len()); + trace!("πŸŽ₯ cli video{monitor_id}: got {}β€―bytes", pkt.data.len()); if tx.send(pkt).is_err() { - tracing::warn!("⚠️πŸŽ₯ cli video{monitor_id}: GUI thread gone"); + warn!("⚠️πŸŽ₯ cli video{monitor_id}: GUI thread gone"); break; } } Err(e) => { - tracing::error!("❌πŸŽ₯ cli video{monitor_id}: gRPC error: {e}"); + error!("❌πŸŽ₯ cli video{monitor_id}: gRPC error: {e}"); break; } } } - tracing::warn!("⚠️πŸŽ₯ li video{monitor_id}: stream ended"); + warn!("⚠️πŸŽ₯ cli video{monitor_id}: stream ended"); } Err(e) => error!("❌πŸŽ₯ video {monitor_id}: {e}"), } diff --git a/client/src/output/audio.rs b/client/src/output/audio.rs index a555f2d..19f7ef8 100644 --- a/client/src/output/audio.rs +++ b/client/src/output/audio.rs @@ -1,27 +1,53 @@ // client/src/output/audio.rs +use anyhow::{Context, Result}; use gstreamer as gst; use gstreamer_app as gst_app; -use lesavka_common::lesavka::AudioPacket; use gst::prelude::*; use tracing::{error, info, warn, debug}; +use lesavka_common::lesavka::AudioPacket; + pub struct AudioOut { + pipeline: gst::Pipeline, src: gst_app::AppSrc, } impl AudioOut { pub fn new() -> anyhow::Result { - gst::init()?; + gst::init().context("initialising GStreamer")?; - // Auto-audiosink picks PipeWire / Pulse etc. - const PIPE: &str = - "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ - queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! autoaudiosink"; + // ── 1. Decide which sink element to instantiate ──────────────────── + let sink = pick_sink_element()?; - // `parse_launch()` returns `gst::Element`; down-cast manually and - // map the error into anyhow ourselves (no `?` on the downcast). - let pipeline: gst::Pipeline = gst::parse::launch(PIPE)? + // Operator can request a tee to /tmp via LESAVKA_TAP_AUDIO=1 + let tee_dump = std::env::var("LESAVKA_TAP_AUDIO") + .ok() + .as_deref() + .map(|v| v == "1") + .unwrap_or(false); + + // ── 2. Assemble pipeline description string ──────────────────────── + let mut pipe = format!( + "appsrc name=src is-live=true format=time do-timestamp=true \ + block=false ! \ + queue leaky=downstream ! \ + aacparse ! avdec_aac ! audioresample ! audioconvert ! {}", + sink, + ); + if tee_dump { + pipe = format!( + "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ + tee name=t ! \ + queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! audioconvert ! {} \ + t. ! queue ! filesink location=/tmp/lesavka-audio.aac", + sink, + ); + warn!("πŸ’Ύ tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)"); + } + + // ── 3. Create the pipeline & fetch the AppSrc ─────────────────────── + let pipeline: gst::Pipeline = gst::parse::launch(&pipe)? .downcast::() .expect("not a pipeline"); @@ -31,41 +57,118 @@ impl AudioOut { .downcast::() .expect("src not an AppSrc"); - src.set_caps(Some(&gst::Caps::builder("audio/mpeg") // mpeg‑4 AAC - .field("mpegversion", &4i32) - .field("stream-format", &"adts") - .build())); + src.set_caps(Some(&gst::Caps::builder("audio/mpeg") + .field("mpegversion", &4i32) // AAC + .field("stream-format", &"adts") // ADTS frames + .field("rate", &48_000i32) // 48β€―kHz + .field("channels", &2i32) // stereo + .build() + )); src.set_format(gst::Format::Time); - - { - let bus = pipeline.bus().expect("bus"); - std::thread::spawn(move || { - use gst::MessageView::*; - for msg in bus.iter_timed(gst::ClockTime::NONE) { - if let Error(e) = msg.view() { - error!("πŸ’₯ client‑audio: {} ({})", - e.error(), e.debug().unwrap_or_default()); - } + + // ── 4. Log *all* warnings/errors from the bus ────────────────────── + let bus = pipeline.bus().unwrap(); + std::thread::spawn(move || { + use gst::MessageView::*; + for msg in bus.iter_timed(gst::ClockTime::NONE) { + match msg.view() { + Error(e) => error!("πŸ’₯ gst error from {:?}: {} ({})", + msg.src().map(|s| s.path_string()), + e.error(), e.debug().unwrap_or_default()), + Warning(w) => warn!("⚠️ gst warning from {:?}: {} ({})", + msg.src().map(|s| s.path_string()), + w.error(), w.debug().unwrap_or_default()), + Element(e) => debug!("πŸ”Ž gst element message: {}", e + .structure() + .map(|s| s.to_string()) + .unwrap_or_default()), + StateChanged(s) if s.current() == gst::State::Playing => + info!("πŸ”Š audio pipeline PLAYING (sink='{}')", sink), + _ => {} } - }); - } + } + }); - pipeline.set_state(gst::State::Playing)?; + pipeline.set_state(gst::State::Playing).context("starting audio pipeline")?; - Ok(Self { src }) + Ok(Self { pipeline, src }) } pub fn push(&self, pkt: AudioPacket) { - static CNT : std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); - let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n % 300 == 0 || n < 10 { - debug!(bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received audio AU"); - } let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); - let _ = self.src.push_buffer(buf); + if let Err(e) = self.src.push_buffer(buf) { + warn!("πŸ“‰ AppSrc push failed: {e:?}"); + } } } + +impl Drop for AudioOut { + fn drop(&mut self) { + // put the whole pipeline back to NULL so GStreamer can dispose cleanly + let _ = self.pipeline.set_state(gst::State::Null); + } +} + +/*──────────────── helper: sink selection ─────────────────────────────*/ +fn pick_sink_element() -> Result { + // 1. Operator override + if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { + info!("πŸŽ›οΈ sink overridden via LESAVKA_AUDIO_SINK={}", s); + return Ok(s); + } + + // 2. Query PipeWire for default & running sinks + // (works even if PulseAudio is present because PipeWire mimics it) + let sinks = list_pw_sinks(); // Vec<(name,state)> + for (n, st) in &sinks { + if *st == "RUNNING" { + info!("πŸ”ˆ using default RUNNING sink '{}'", n); + return Ok(format!("pulsesink device={}", n)); + } + } + + // 3. First RUNNING sink + if let Some((n, _)) = sinks.iter().find(|(_, st)| *st == "RUNNING") { + warn!("πŸͺ„ picking first RUNNING sink '{}'", n); + return Ok(format!("pulsesink device={}", n)); + } + // 4. Anything + if let Some((n, _)) = sinks.first() { + warn!("πŸͺ„ picking first sink '{}'", n); + return Ok(format!("pulsesink device={}", n)); + } + + // Fallback – let autoaudiosink try its luck + warn!("😬 no PipeWire sinks readable – falling back to autoaudiosink"); + Ok("autoaudiosink".to_string()) +} + +/// Minimal PipeWire sink enumerator (no extra crate required). +fn list_pw_sinks() -> Vec<(String, String)> { + let mut out = Vec::new(); + if let Ok(lines) = std::process::Command::new("pw-cli") + .args(["ls", "Node"]) + .output() + .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) + { + for l in lines.lines() { + // Example: " 36 β”‚ node.alive = true β”‚ alsa_output.pci-0000_2f_00.4.iec958-stereo β”‚ state: SUSPENDED ..." + if let Some(pos) = l.find("β”‚") { + let parts: Vec<_> = l[pos..].split('β”‚').map(|s| s.trim()).collect(); + if parts.len() >= 3 && parts[2].starts_with("alsa_output.") { + let name = parts[2].to_string(); + // try to parse state, else UNKNOWN + let state = parts.get(3) + .and_then(|s| s.split_whitespace().nth(1)) + .unwrap_or("UNKNOWN") + .to_string(); + out.push((name, state)); + } + } + } + } + out +} diff --git a/scripts/manual/vpn-open.sh b/scripts/manual/vpn-open.sh old mode 100644 new mode 100755 diff --git a/scripts/manual/vpn-test.sh b/scripts/manual/vpn-test.sh old mode 100644 new mode 100755 diff --git a/server/src/audio.rs b/server/src/audio.rs index a6b2ff0..8c0812c 100644 --- a/server/src/audio.rs +++ b/server/src/audio.rs @@ -1,7 +1,7 @@ // server/src/audio.rs #![forbid(unsafe_code)] -use anyhow::Context; +use anyhow::{Context, anyhow}; use futures_util::Stream; use gstreamer as gst; use gstreamer_app as gst_app; @@ -128,19 +128,21 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { }) } +// ─── server/src/audio.rs: build_pipeline_desc() (replace entire fn) ───────── fn build_pipeline_desc(dev: &str) -> anyhow::Result { - // choose the first encoder that exists on the system - let enc = ["voaacenc", "avenc_aac", "fdkaacenc"] + let reg = gst::Registry::get(); + let enc = ["fdkaacenc", "voaacenc", "avenc_aac"] .into_iter() - .find(|e| ElementFactory::find(e).is_some()) // cheap run‑time probe - .ok_or_else(|| anyhow::anyhow!("no AAC encoder plugin available"))?; + .find(|&e| { + reg.find_plugin(e).is_some() + || reg.find_feature(e, ElementFactory::static_type()).is_some() + }) + .ok_or_else(|| anyhow!("no AAC encoder plugin available"))?; Ok(format!( - // 48β€―kHz stereo, floats, gadget is master β†’ provide‑clock=false - "alsasrc device=\"{dev}\" provide-clock=false do-timestamp=true ! \ - audioconvert ! audioresample ! \ - audio/x-raw,format=F32LE,channels=2,rate=48000 ! \ - {enc} bitrate=192000 ! aacparse ! queue ! \ - appsink name=asink emit-signals=true max-buffers=64 drop=true" + "alsasrc device=\"{dev}\" do-timestamp=true ! \ + audio/x-raw,channels=2,rate=48000 ! {enc} bitrate=192000 ! \ + aacparse add-adts=true ! \ + queue ! appsink name=asink emit-signals=true" )) -} +} \ No newline at end of file