diff --git a/client/src/output/audio.rs b/client/src/output/audio.rs index 19f7ef8..9ada77c 100644 --- a/client/src/output/audio.rs +++ b/client/src/output/audio.rs @@ -149,26 +149,41 @@ fn pick_sink_element() -> Result { /// Minimal PipeWire sink enumerator (no extra crate required). fn list_pw_sinks() -> Vec<(String, String)> { let mut out = Vec::new(); - if let Ok(lines) = std::process::Command::new("pw-cli") - .args(["ls", "Node"]) - .output() - .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) - { - for l in lines.lines() { - // Example: " 36 │ node.alive = true │ alsa_output.pci-0000_2f_00.4.iec958-stereo │ state: SUSPENDED ..." - if let Some(pos) = l.find("│") { - let parts: Vec<_> = l[pos..].split('│').map(|s| s.trim()).collect(); - if parts.len() >= 3 && parts[2].starts_with("alsa_output.") { - let name = parts[2].to_string(); - // try to parse state, else UNKNOWN - let state = parts.get(3) - .and_then(|s| s.split_whitespace().nth(1)) - .unwrap_or("UNKNOWN") - .to_string(); - out.push((name, state)); - } + // if let Ok(lines) = std::process::Command::new("pw-cli") + // .args(["ls", "Node"]) + // .output() + // .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) + // { + // for l in lines.lines() { + // // Example: " 36 │ node.alive = true │ alsa_output.pci-0000_2f_00.4.iec958-stereo │ state: SUSPENDED ..." + // if let Some(pos) = l.find("│") { + // let parts: Vec<_> = l[pos..].split('│').map(|s| s.trim()).collect(); + // if parts.len() >= 3 && parts[2].starts_with("alsa_output.") { + // let name = parts[2].to_string(); + // // try to parse state, else UNKNOWN + // let state = parts.get(3) + // .and_then(|s| s.split_whitespace().nth(1)) + // .unwrap_or("UNKNOWN") + // .to_string(); + // out.push((name, state)); + // } + // } + // } + // } + + if out.is_empty() { + // ── PulseAudio / pactl fallback ──────────────────────────────── + if let Ok(info) = std::process::Command::new("pactl") + .args(["info"]) + .output() + .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) + { + if let Some(line) = info.lines().find(|l| l.starts_with("Default Sink:")) { + let def = line["Default Sink:".len()..].trim(); + return vec![(def.to_string(), "UNKNOWN".to_string())]; } } } + out } diff --git a/scripts/manual/audio-clip-fetch.sh b/scripts/manual/audio-clip-fetch.sh new file mode 100644 index 0000000..8d4ae24 --- /dev/null +++ b/scripts/manual/audio-clip-fetch.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# scripts/manual/audio-clip-fetch.sh + +# Pull & play the most recent 1 s AAC clip from lesavka‑server +PI_HOST="nikto@192.168.42.253" # adjust +REMOTE_DIR="/tmp" +DEST="$(mktemp -u).aac" + +scp "${PI_HOST}:${REMOTE_DIR}/ear-*.aac" "$DEST" 2>/dev/null \ + || { echo "❌ no clip files yet"; exit 1; } + +LATEST=$(ls -1t ear-*.aac | head -n1) +echo "🎧 playing ${LATEST} ..." +gst-play-1.0 --quiet "${LATEST}" diff --git a/server/Cargo.toml b/server/Cargo.toml index 3099c0f..e0b1280 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,6 +24,7 @@ gstreamer-app = { version = "0.23", features = ["v1_22"] } gstreamer-video = "0.23" udev = "0.8" prost-types = "0.13" +chrono = { version = "0.4", default-features = false, features = ["std", "clock", "serde"] } [build-dependencies] prost-build = "0.13" diff --git a/server/src/audio.rs b/server/src/audio.rs index bf08362..0729e3c 100644 --- a/server/src/audio.rs +++ b/server/src/audio.rs @@ -1,17 +1,19 @@ // server/src/audio.rs #![forbid(unsafe_code)] -use anyhow::{Context, anyhow}; +use anyhow::{anyhow, Context}; +use chrono::Local; use futures_util::Stream; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; -use gst::ElementFactory; +use gst::{ElementFactory, MessageView}; use gst::MessageView::*; use lesavka_common::lesavka::AudioPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, error, warn}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; /// “Speaker” stream coming **from** the remote host (UAC2‑gadget playback /// endpoint) **towards** the client. @@ -63,6 +65,12 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { .expect("asink") .downcast() .expect("appsink"); + if let Some(tap) = pipeline + .by_name("debugtap") + .and_then(|e| e.downcast::().ok()) + { + clip_tap(tap); + } let (tx, rx) = tokio::sync::mpsc::channel(8192); @@ -128,26 +136,90 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { }) } -// ─── server/src/audio.rs: build_pipeline_desc() (replace entire fn) ───────── +/*────────────────────────── build_pipeline_desc ───────────────────────────*/ fn build_pipeline_desc(dev: &str) -> anyhow::Result { let reg = gst::Registry::get(); - // Pick the first available encoder - let enc = ["avenc_aac", "fdkaacenc", "voaacenc"] + // first available encoder + let enc = ["fdkaacenc", "voaacenc", "avenc_aac"] .into_iter() .find(|&e| { reg.find_plugin(e).is_some() - || reg.find_feature(e, ElementFactory::static_type()).is_some() + || reg + .find_feature(e, ElementFactory::static_type()) + .is_some() }) .ok_or_else(|| anyhow!("no AAC encoder plugin available"))?; - // All encoders are asked for raw elementary AAC; aacparse + - // capsfilter converts it to **ADTS** unconditionally. + // one long literal assembled with `concat!` so Rust sees *one* string Ok(format!( - "alsasrc device=\"{dev}\" do-timestamp=true ! \ - audio/x-raw,channels=2,rate=48000 ! {enc} bitrate=192000 ! \ - aacparse ! \ - capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! \ - queue ! appsink name=asink emit-signals=true" + concat!( + "alsasrc device=\"{dev}\" do-timestamp=true ! ", + "audio/x-raw,format=S16LE,channels=2,rate=48000 ! ", + "audioconvert ! audioresample ! {enc} bitrate=192000 ! ", + "aacparse ! ", + "capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! ", + "tee name=t ", + "t. ! queue ! appsink name=asink emit-signals=true ", + "t. ! queue ! appsink name=debugtap emit-signals=true max-buffers=500 drop=true" + ), + dev = dev, + enc = enc )) +} + +/*────────────────────────────── clip_tap() ────────────────────────────────*/ +/// Called once per pipeline; spawns a thread that writes a 1 s AAC file +/// at the start of every wall‑clock minute **while log‑level == TRACE**. +fn clip_tap(tap: gst_app::AppSink) { + use gst::prelude::*; + + std::thread::spawn(move || { + use std::fs::File; + use std::io::Write; + + let mut collecting = Vec::with_capacity(200_000); // ~1 s + let mut next_min_boundary = next_minute(); + + loop { + match tap.pull_sample() { + Ok(s) => { + let buf = s.buffer().unwrap(); + let map = buf.map_readable().unwrap(); + collecting.extend_from_slice(map.as_slice()); + + // once per minute boundary & trace‑level + if tracing::enabled!(tracing::Level::TRACE) && + SystemTime::now() >= next_min_boundary { + if !collecting.is_empty() { + let ts = chrono::Local::now() + .format("%Y%m%d-%H%M%S") + .to_string(); + let path = format!("/tmp/ear-{ts}.aac"); + if let Ok(mut f) = File::create(&path) { + let _ = f.write_all(&collecting); + tracing::debug!("📼 wrote 1 s clip → {}", path); + } + } + collecting.clear(); + next_min_boundary = next_minute(); + } + + if collecting.len() > 192_000 { // keep at most ~1 s + collecting.truncate(192_000); + } + } + Err(_) => break, // EOS + } + } + }); + + fn next_minute() -> SystemTime { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap(); + let secs = now.as_secs(); + let next = (secs / 60 + 1) * 60; + UNIX_EPOCH + Duration::from_secs(next) + } } \ No newline at end of file diff --git a/server/src/video.rs b/server/src/video.rs index b26a009..1986b60 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -5,6 +5,7 @@ use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use gst::{log, MessageView}; +use gst::MessageView::*; use lesavka_common::lesavka::VideoPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; @@ -98,7 +99,6 @@ pub async fn eye_ball( let eye_clone = eye.to_owned(); std::thread::spawn(move || { for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView::*; match msg.view() { Error(err) => { error!(target:"lesavka_server::video",