Audio Fix
This commit is contained in:
parent
f8528897de
commit
1d54293311
@ -5,7 +5,7 @@ use std::time::Duration;
|
|||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
||||||
use tonic::{transport::Channel, Request};
|
use tonic::{transport::Channel, Request};
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, trace, debug, info, warn};
|
||||||
use winit::{
|
use winit::{
|
||||||
event::Event,
|
event::Event,
|
||||||
event_loop::{EventLoopBuilder, ControlFlow},
|
event_loop::{EventLoopBuilder, ControlFlow},
|
||||||
@ -203,23 +203,23 @@ impl LesavkaClientApp {
|
|||||||
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
|
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
|
||||||
match cli.capture_video(Request::new(req)).await {
|
match cli.capture_video(Request::new(req)).await {
|
||||||
Ok(mut stream) => {
|
Ok(mut stream) => {
|
||||||
tracing::info!("🎥 cli video{monitor_id}: stream opened");
|
info!("🎥 cli video{monitor_id}: stream opened");
|
||||||
while let Some(res) = stream.get_mut().message().await.transpose() {
|
while let Some(res) = stream.get_mut().message().await.transpose() {
|
||||||
match res {
|
match res {
|
||||||
Ok(pkt) => {
|
Ok(pkt) => {
|
||||||
tracing::debug!("🎥 cli video{monitor_id}: got {} bytes", pkt.data.len());
|
trace!("🎥 cli video{monitor_id}: got {} bytes", pkt.data.len());
|
||||||
if tx.send(pkt).is_err() {
|
if tx.send(pkt).is_err() {
|
||||||
tracing::warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone");
|
warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("❌🎥 cli video{monitor_id}: gRPC error: {e}");
|
error!("❌🎥 cli video{monitor_id}: gRPC error: {e}");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tracing::warn!("⚠️🎥 li video{monitor_id}: stream ended");
|
warn!("⚠️🎥 cli video{monitor_id}: stream ended");
|
||||||
}
|
}
|
||||||
Err(e) => error!("❌🎥 video {monitor_id}: {e}"),
|
Err(e) => error!("❌🎥 video {monitor_id}: {e}"),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,27 +1,53 @@
|
|||||||
// client/src/output/audio.rs
|
// client/src/output/audio.rs
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
use gstreamer as gst;
|
use gstreamer as gst;
|
||||||
use gstreamer_app as gst_app;
|
use gstreamer_app as gst_app;
|
||||||
use lesavka_common::lesavka::AudioPacket;
|
|
||||||
use gst::prelude::*;
|
use gst::prelude::*;
|
||||||
use tracing::{error, info, warn, debug};
|
use tracing::{error, info, warn, debug};
|
||||||
|
|
||||||
|
use lesavka_common::lesavka::AudioPacket;
|
||||||
|
|
||||||
pub struct AudioOut {
|
pub struct AudioOut {
|
||||||
|
pipeline: gst::Pipeline,
|
||||||
src: gst_app::AppSrc,
|
src: gst_app::AppSrc,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AudioOut {
|
impl AudioOut {
|
||||||
pub fn new() -> anyhow::Result<Self> {
|
pub fn new() -> anyhow::Result<Self> {
|
||||||
gst::init()?;
|
gst::init().context("initialising GStreamer")?;
|
||||||
|
|
||||||
// Auto-audiosink picks PipeWire / Pulse etc.
|
// ── 1. Decide which sink element to instantiate ────────────────────
|
||||||
const PIPE: &str =
|
let sink = pick_sink_element()?;
|
||||||
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
|
|
||||||
queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! autoaudiosink";
|
|
||||||
|
|
||||||
// `parse_launch()` returns `gst::Element`; down-cast manually and
|
// Operator can request a tee to /tmp via LESAVKA_TAP_AUDIO=1
|
||||||
// map the error into anyhow ourselves (no `?` on the downcast).
|
let tee_dump = std::env::var("LESAVKA_TAP_AUDIO")
|
||||||
let pipeline: gst::Pipeline = gst::parse::launch(PIPE)?
|
.ok()
|
||||||
|
.as_deref()
|
||||||
|
.map(|v| v == "1")
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
// ── 2. Assemble pipeline description string ────────────────────────
|
||||||
|
let mut pipe = format!(
|
||||||
|
"appsrc name=src is-live=true format=time do-timestamp=true \
|
||||||
|
block=false ! \
|
||||||
|
queue leaky=downstream ! \
|
||||||
|
aacparse ! avdec_aac ! audioresample ! audioconvert ! {}",
|
||||||
|
sink,
|
||||||
|
);
|
||||||
|
if tee_dump {
|
||||||
|
pipe = format!(
|
||||||
|
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
|
||||||
|
tee name=t ! \
|
||||||
|
queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! audioconvert ! {} \
|
||||||
|
t. ! queue ! filesink location=/tmp/lesavka-audio.aac",
|
||||||
|
sink,
|
||||||
|
);
|
||||||
|
warn!("💾 tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── 3. Create the pipeline & fetch the AppSrc ───────────────────────
|
||||||
|
let pipeline: gst::Pipeline = gst::parse::launch(&pipe)?
|
||||||
.downcast::<gst::Pipeline>()
|
.downcast::<gst::Pipeline>()
|
||||||
.expect("not a pipeline");
|
.expect("not a pipeline");
|
||||||
|
|
||||||
@ -31,41 +57,118 @@ impl AudioOut {
|
|||||||
.downcast::<gst_app::AppSrc>()
|
.downcast::<gst_app::AppSrc>()
|
||||||
.expect("src not an AppSrc");
|
.expect("src not an AppSrc");
|
||||||
|
|
||||||
src.set_caps(Some(&gst::Caps::builder("audio/mpeg") // mpeg‑4 AAC
|
src.set_caps(Some(&gst::Caps::builder("audio/mpeg")
|
||||||
.field("mpegversion", &4i32)
|
.field("mpegversion", &4i32) // AAC
|
||||||
.field("stream-format", &"adts")
|
.field("stream-format", &"adts") // ADTS frames
|
||||||
.build()));
|
.field("rate", &48_000i32) // 48 kHz
|
||||||
|
.field("channels", &2i32) // stereo
|
||||||
|
.build()
|
||||||
|
));
|
||||||
src.set_format(gst::Format::Time);
|
src.set_format(gst::Format::Time);
|
||||||
|
|
||||||
{
|
// ── 4. Log *all* warnings/errors from the bus ──────────────────────
|
||||||
let bus = pipeline.bus().expect("bus");
|
let bus = pipeline.bus().unwrap();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
use gst::MessageView::*;
|
use gst::MessageView::*;
|
||||||
for msg in bus.iter_timed(gst::ClockTime::NONE) {
|
for msg in bus.iter_timed(gst::ClockTime::NONE) {
|
||||||
if let Error(e) = msg.view() {
|
match msg.view() {
|
||||||
error!("💥 client‑audio: {} ({})",
|
Error(e) => error!("💥 gst error from {:?}: {} ({})",
|
||||||
e.error(), e.debug().unwrap_or_default());
|
msg.src().map(|s| s.path_string()),
|
||||||
}
|
e.error(), e.debug().unwrap_or_default()),
|
||||||
|
Warning(w) => warn!("⚠️ gst warning from {:?}: {} ({})",
|
||||||
|
msg.src().map(|s| s.path_string()),
|
||||||
|
w.error(), w.debug().unwrap_or_default()),
|
||||||
|
Element(e) => debug!("🔎 gst element message: {}", e
|
||||||
|
.structure()
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.unwrap_or_default()),
|
||||||
|
StateChanged(s) if s.current() == gst::State::Playing =>
|
||||||
|
info!("🔊 audio pipeline PLAYING (sink='{}')", sink),
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
pipeline.set_state(gst::State::Playing)?;
|
pipeline.set_state(gst::State::Playing).context("starting audio pipeline")?;
|
||||||
|
|
||||||
Ok(Self { src })
|
Ok(Self { pipeline, src })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push(&self, pkt: AudioPacket) {
|
pub fn push(&self, pkt: AudioPacket) {
|
||||||
static CNT : std::sync::atomic::AtomicU64 =
|
|
||||||
std::sync::atomic::AtomicU64::new(0);
|
|
||||||
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
if n % 300 == 0 || n < 10 {
|
|
||||||
debug!(bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received audio AU");
|
|
||||||
}
|
|
||||||
let mut buf = gst::Buffer::from_slice(pkt.data);
|
let mut buf = gst::Buffer::from_slice(pkt.data);
|
||||||
buf.get_mut()
|
buf.get_mut()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
|
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
|
||||||
let _ = self.src.push_buffer(buf);
|
if let Err(e) = self.src.push_buffer(buf) {
|
||||||
|
warn!("📉 AppSrc push failed: {e:?}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for AudioOut {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// put the whole pipeline back to NULL so GStreamer can dispose cleanly
|
||||||
|
let _ = self.pipeline.set_state(gst::State::Null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*──────────────── helper: sink selection ─────────────────────────────*/
|
||||||
|
fn pick_sink_element() -> Result<String> {
|
||||||
|
// 1. Operator override
|
||||||
|
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
|
||||||
|
info!("🎛️ sink overridden via LESAVKA_AUDIO_SINK={}", s);
|
||||||
|
return Ok(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Query PipeWire for default & running sinks
|
||||||
|
// (works even if PulseAudio is present because PipeWire mimics it)
|
||||||
|
let sinks = list_pw_sinks(); // Vec<(name,state)>
|
||||||
|
for (n, st) in &sinks {
|
||||||
|
if *st == "RUNNING" {
|
||||||
|
info!("🔈 using default RUNNING sink '{}'", n);
|
||||||
|
return Ok(format!("pulsesink device={}", n));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. First RUNNING sink
|
||||||
|
if let Some((n, _)) = sinks.iter().find(|(_, st)| *st == "RUNNING") {
|
||||||
|
warn!("🪄 picking first RUNNING sink '{}'", n);
|
||||||
|
return Ok(format!("pulsesink device={}", n));
|
||||||
|
}
|
||||||
|
// 4. Anything
|
||||||
|
if let Some((n, _)) = sinks.first() {
|
||||||
|
warn!("🪄 picking first sink '{}'", n);
|
||||||
|
return Ok(format!("pulsesink device={}", n));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback – let autoaudiosink try its luck
|
||||||
|
warn!("😬 no PipeWire sinks readable – falling back to autoaudiosink");
|
||||||
|
Ok("autoaudiosink".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Minimal PipeWire sink enumerator (no extra crate required).
|
||||||
|
fn list_pw_sinks() -> Vec<(String, String)> {
|
||||||
|
let mut out = Vec::new();
|
||||||
|
if let Ok(lines) = std::process::Command::new("pw-cli")
|
||||||
|
.args(["ls", "Node"])
|
||||||
|
.output()
|
||||||
|
.map(|o| String::from_utf8_lossy(&o.stdout).to_string())
|
||||||
|
{
|
||||||
|
for l in lines.lines() {
|
||||||
|
// Example: " 36 │ node.alive = true │ alsa_output.pci-0000_2f_00.4.iec958-stereo │ state: SUSPENDED ..."
|
||||||
|
if let Some(pos) = l.find("│") {
|
||||||
|
let parts: Vec<_> = l[pos..].split('│').map(|s| s.trim()).collect();
|
||||||
|
if parts.len() >= 3 && parts[2].starts_with("alsa_output.") {
|
||||||
|
let name = parts[2].to_string();
|
||||||
|
// try to parse state, else UNKNOWN
|
||||||
|
let state = parts.get(3)
|
||||||
|
.and_then(|s| s.split_whitespace().nth(1))
|
||||||
|
.unwrap_or("UNKNOWN")
|
||||||
|
.to_string();
|
||||||
|
out.push((name, state));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|||||||
0
scripts/manual/vpn-open.sh
Normal file → Executable file
0
scripts/manual/vpn-open.sh
Normal file → Executable file
0
scripts/manual/vpn-test.sh
Normal file → Executable file
0
scripts/manual/vpn-test.sh
Normal file → Executable file
@ -1,7 +1,7 @@
|
|||||||
// server/src/audio.rs
|
// server/src/audio.rs
|
||||||
#![forbid(unsafe_code)]
|
#![forbid(unsafe_code)]
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::{Context, anyhow};
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
use gstreamer as gst;
|
use gstreamer as gst;
|
||||||
use gstreamer_app as gst_app;
|
use gstreamer_app as gst_app;
|
||||||
@ -128,19 +128,21 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── server/src/audio.rs: build_pipeline_desc() (replace entire fn) ─────────
|
||||||
fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
|
fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
|
||||||
// choose the first encoder that exists on the system
|
let reg = gst::Registry::get();
|
||||||
let enc = ["voaacenc", "avenc_aac", "fdkaacenc"]
|
let enc = ["fdkaacenc", "voaacenc", "avenc_aac"]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.find(|e| ElementFactory::find(e).is_some()) // cheap run‑time probe
|
.find(|&e| {
|
||||||
.ok_or_else(|| anyhow::anyhow!("no AAC encoder plugin available"))?;
|
reg.find_plugin(e).is_some()
|
||||||
|
|| reg.find_feature(e, ElementFactory::static_type()).is_some()
|
||||||
|
})
|
||||||
|
.ok_or_else(|| anyhow!("no AAC encoder plugin available"))?;
|
||||||
|
|
||||||
Ok(format!(
|
Ok(format!(
|
||||||
// 48 kHz stereo, floats, gadget is master → provide‑clock=false
|
"alsasrc device=\"{dev}\" do-timestamp=true ! \
|
||||||
"alsasrc device=\"{dev}\" provide-clock=false do-timestamp=true ! \
|
audio/x-raw,channels=2,rate=48000 ! {enc} bitrate=192000 ! \
|
||||||
audioconvert ! audioresample ! \
|
aacparse add-adts=true ! \
|
||||||
audio/x-raw,format=F32LE,channels=2,rate=48000 ! \
|
queue ! appsink name=asink emit-signals=true"
|
||||||
{enc} bitrate=192000 ! aacparse ! queue ! \
|
|
||||||
appsink name=asink emit-signals=true max-buffers=64 drop=true"
|
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user