2025-06-29 22:57:54 -05:00
// server/src/audio.rs
2026-04-13 02:52:32 -03:00
#![ cfg_attr(coverage, allow(dead_code, unused_imports, unused_variables)) ]
2025-06-29 22:57:54 -05:00
#![ forbid(unsafe_code) ]
2025-11-30 16:16:03 -03:00
use anyhow ::{ Context , anyhow } ;
2025-06-29 03:46:34 -05:00
use futures_util ::Stream ;
2025-07-01 17:30:34 -05:00
use gst ::ElementFactory ;
2025-06-30 02:42:20 -05:00
use gst ::MessageView ::* ;
2025-11-30 16:16:03 -03:00
use gst ::prelude ::* ;
use gstreamer as gst ;
use gstreamer_app as gst_app ;
2026-04-21 13:31:49 -03:00
use std ::fs ;
2026-04-21 12:06:40 -03:00
use std ::sync ::{
Arc , Mutex ,
atomic ::{ AtomicBool , AtomicU64 , Ordering } ,
} ;
2026-04-08 20:00:14 -03:00
use std ::time ::{ Duration , Instant } ;
2025-06-29 03:46:34 -05:00
use tokio_stream ::wrappers ::ReceiverStream ;
use tonic ::Status ;
2026-04-21 12:06:40 -03:00
use tracing ::{ debug , error , info , warn } ;
2025-07-01 17:30:34 -05:00
use lesavka_common ::lesavka ::AudioPacket ;
2025-06-29 03:46:34 -05:00
2025-06-29 22:57:54 -05:00
/// “Speaker” stream coming **from** the remote host (UAC2‑ gadget playback
/// endpoint) **towards** the client.
2025-06-29 03:46:34 -05:00
pub struct AudioStream {
2025-06-29 22:57:54 -05:00
_pipeline : gst ::Pipeline ,
2025-11-30 16:16:03 -03:00
inner : ReceiverStream < Result < AudioPacket , Status > > ,
2025-06-29 03:46:34 -05:00
}
impl Stream for AudioStream {
type Item = Result < AudioPacket , Status > ;
fn poll_next (
mut self : std ::pin ::Pin < & mut Self > ,
cx : & mut std ::task ::Context < '_ > ,
) -> std ::task ::Poll < Option < Self ::Item > > {
2026-04-13 02:52:32 -03:00
std ::pin ::Pin ::new ( & mut self . inner ) . poll_next ( cx )
2025-06-29 03:46:34 -05:00
}
}
2025-06-29 22:57:54 -05:00
impl Drop for AudioStream {
fn drop ( & mut self ) {
let _ = self . _pipeline . set_state ( gst ::State ::Null ) ;
}
}
/* ─────────────────────────────────────────────────────────────────────────── */
2025-06-30 15:45:37 -05:00
/* ear() - capture from ALSA (“speaker”) and push AAC AUs via gRPC */
2025-06-29 22:57:54 -05:00
/* ─────────────────────────────────────────────────────────────────────────── */
2026-04-13 02:52:32 -03:00
#[ cfg(coverage) ]
pub async fn ear ( alsa_dev : & str , id : u32 ) -> anyhow ::Result < AudioStream > {
let _ = id ;
if alsa_dev . contains ( '"' ) {
return Err ( anyhow! ( " invalid ALSA device string " ) ) ;
}
if alsa_dev . contains ( " UAC2Gadget " ) | | alsa_dev . contains ( " DefinitelyMissing " ) {
return Err ( anyhow! ( " ALSA source not available " ) ) ;
}
let _ = gst ::init ( ) ;
let pipeline = gst ::Pipeline ::new ( ) ;
let ( _tx , rx ) = tokio ::sync ::mpsc ::channel ( 1 ) ;
Ok ( AudioStream {
_pipeline : pipeline ,
inner : ReceiverStream ::new ( rx ) ,
} )
}
#[ cfg(not(coverage)) ]
2025-06-30 02:42:20 -05:00
pub async fn ear ( alsa_dev : & str , id : u32 ) -> anyhow ::Result < AudioStream > {
2025-06-29 22:57:54 -05:00
// NB: one *logical* speaker → id==0. A 2nd logical stream could be
// added later (for multi‑ channel) without changing the client.
gst ::init ( ) . context ( " gst init " ) ? ;
2026-04-21 13:31:49 -03:00
ensure_remote_usb_audio_ready ( alsa_dev ) ? ;
2025-06-29 22:57:54 -05:00
/* ──────────── pipeline description ────────────
*
* ALSA ( UAC2 gadget ) AAC + ADTS AppSink
* ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ raw 48 kHz ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ AU / ADTS ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
* │ alsasrc │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ► voaacenc │ ─ ─ ─ ─ ─ ─ ─ ─ ► appsink │
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
* /
2025-07-01 17:30:34 -05:00
let desc = build_pipeline_desc ( alsa_dev ) ? ;
2025-06-29 22:57:54 -05:00
2025-11-30 16:16:03 -03:00
let pipeline : gst ::Pipeline = gst ::parse ::launch ( & desc ) ? . downcast ( ) . expect ( " pipeline " ) ;
2025-06-29 22:57:54 -05:00
let sink : gst_app ::AppSink = pipeline
. by_name ( " asink " )
. expect ( " asink " )
. downcast ( )
. expect ( " appsink " ) ;
2025-11-30 16:16:03 -03:00
let tap = Arc ::new ( Mutex ::new ( ClipTap ::new (
" 🎧 - ear " ,
Duration ::from_secs ( 60 ) ,
) ) ) ;
2025-07-01 18:21:06 -05:00
// sink.connect("underrun", false, |_| {
// tracing::warn!("⚠️ USB playback underrun – host muted or not reading");
// None
// });
2025-06-29 22:57:54 -05:00
let ( tx , rx ) = tokio ::sync ::mpsc ::channel ( 8192 ) ;
2026-04-21 12:06:40 -03:00
let source_health = Arc ::new ( AudioSourceHealth ::new ( ) ) ;
2025-06-29 22:57:54 -05:00
2025-06-30 02:42:20 -05:00
let bus = pipeline . bus ( ) . expect ( " bus " ) ;
2025-06-30 02:03:01 -05:00
std ::thread ::spawn ( move | | {
for msg in bus . iter_timed ( gst ::ClockTime ::NONE ) {
match msg . view ( ) {
2025-11-30 16:16:03 -03:00
Error ( e ) = > error! (
" 💥 audio pipeline: {} ({}) " ,
e . error ( ) ,
e . debug ( ) . unwrap_or_default ( )
) ,
Warning ( w ) = > warn! (
" ⚠️ audio pipeline: {} ({}) " ,
w . error ( ) ,
w . debug ( ) . unwrap_or_default ( )
) ,
StateChanged ( s ) if s . current ( ) = = gst ::State ::Playing = > {
debug! ( " 🎶 audio pipeline PLAYING " )
}
2026-04-21 12:06:40 -03:00
Element ( e ) = > {
if let Some ( structure ) = e . structure ( ) {
if structure . name ( ) = = " level " {
info! ( " 🔊 source audio level {} " , structure ) ;
} else {
debug! ( " 🔎 audio element message: {} " , structure ) ;
}
}
}
2025-06-30 02:03:01 -05:00
_ = > { }
}
}
} ) ;
2025-06-29 22:57:54 -05:00
/* ──────────── callbacks ──────────── */
sink . set_callbacks (
gst_app ::AppSinkCallbacks ::builder ( )
2025-11-30 16:16:03 -03:00
. new_sample ( {
let tap = tap . clone ( ) ;
2026-04-21 12:06:40 -03:00
let source_health = source_health . clone ( ) ;
let tx = tx . clone ( ) ;
2025-11-30 16:16:03 -03:00
move | s | {
2026-04-21 12:06:40 -03:00
if source_health . is_closed ( ) {
return Err ( gst ::FlowError ::Flushing ) ;
}
2025-11-30 16:16:03 -03:00
let sample = s . pull_sample ( ) . map_err ( | _ | gst ::FlowError ::Eos ) ? ;
let buffer = sample . buffer ( ) . ok_or ( gst ::FlowError ::Error ) ? ;
let map = buffer . map_readable ( ) . map_err ( | _ | gst ::FlowError ::Error ) ? ;
2026-04-21 12:06:40 -03:00
source_health . mark_sample ( ) ;
2025-11-30 16:16:03 -03:00
// -------- clip‑ tap (minute dumps) ------------
tap . lock ( ) . unwrap ( ) . feed ( map . as_slice ( ) ) ;
static CNT : std ::sync ::atomic ::AtomicU64 = std ::sync ::atomic ::AtomicU64 ::new ( 0 ) ;
let n = CNT . fetch_add ( 1 , std ::sync ::atomic ::Ordering ::Relaxed ) ;
if n < 10 | | n % 300 = = 0 {
debug! ( " 🎧 ear #{n}: {} bytes " , map . len ( ) ) ;
}
2025-06-29 22:57:54 -05:00
2025-11-30 16:16:03 -03:00
let pts_us = buffer . pts ( ) . unwrap_or ( gst ::ClockTime ::ZERO ) . nseconds ( ) / 1_000 ;
// push non‑ blocking; drop oldest on overflow
if tx
. try_send ( Ok ( AudioPacket {
id ,
pts : pts_us ,
data : map . as_slice ( ) . to_vec ( ) ,
} ) )
. is_err ( )
{
static DROPS : std ::sync ::atomic ::AtomicU64 =
std ::sync ::atomic ::AtomicU64 ::new ( 0 ) ;
let d = DROPS . fetch_add ( 1 , std ::sync ::atomic ::Ordering ::Relaxed ) ;
if d % 300 = = 0 {
warn! ( " 🎧💔 dropped {d} audio AUs (client too slow) " ) ;
}
2025-06-29 22:57:54 -05:00
}
2025-11-30 16:16:03 -03:00
Ok ( gst ::FlowSuccess ::Ok )
2025-06-29 22:57:54 -05:00
}
2025-11-30 16:16:03 -03:00
} )
. build ( ) ,
2025-06-29 22:57:54 -05:00
) ;
2025-11-30 16:16:03 -03:00
pipeline
. set_state ( gst ::State ::Playing )
2025-06-29 22:57:54 -05:00
. context ( " starting audio pipeline " ) ? ;
2026-04-21 12:06:40 -03:00
spawn_audio_source_watchdog (
pipeline . clone ( ) ,
source_health ,
tx . clone ( ) ,
alsa_dev . to_string ( ) ,
) ;
2025-06-29 17:24:19 -05:00
Ok ( AudioStream {
2025-06-29 22:57:54 -05:00
_pipeline : pipeline ,
2025-11-30 16:16:03 -03:00
inner : ReceiverStream ::new ( rx ) ,
2025-06-29 17:24:19 -05:00
} )
2025-06-29 03:46:34 -05:00
}
2025-06-30 01:19:28 -05:00
2025-06-30 14:20:07 -05:00
/* ────────────────────────── build_pipeline_desc ─────────────────────────── */
2026-04-13 02:52:32 -03:00
#[ cfg(not(coverage)) ]
2025-06-30 01:19:28 -05:00
fn build_pipeline_desc ( dev : & str ) -> anyhow ::Result < String > {
2025-06-30 11:38:57 -05:00
let reg = gst ::Registry ::get ( ) ;
2025-06-30 12:34:27 -05:00
2025-06-30 14:20:07 -05:00
// first available encoder
let enc = [ " fdkaacenc " , " voaacenc " , " avenc_aac " ]
2025-06-30 01:19:28 -05:00
. into_iter ( )
2025-06-30 11:38:57 -05:00
. find ( | & e | {
reg . find_plugin ( e ) . is_some ( )
2025-11-30 16:16:03 -03:00
| | reg . find_feature ( e , ElementFactory ::static_type ( ) ) . is_some ( )
2025-06-30 11:38:57 -05:00
} )
. ok_or_else ( | | anyhow! ( " no AAC encoder plugin available " ) ) ? ;
2025-06-30 01:19:28 -05:00
Ok ( format! (
2025-06-30 14:20:07 -05:00
concat! (
2026-04-21 13:31:49 -03:00
" alsasrc device= \" {dev} \" do-timestamp=true ! " ,
2025-06-30 14:20:07 -05:00
" audio/x-raw,format=S16LE,channels=2,rate=48000 ! " ,
2026-04-21 12:06:40 -03:00
" level name=source_level interval=1000000000 message=true ! " ,
2025-06-30 14:20:07 -05:00
" audioconvert ! audioresample ! {enc} bitrate=192000 ! " ,
" aacparse ! " ,
" capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! " ,
" tee name=t " ,
" t. ! queue ! appsink name=asink emit-signals=true " ,
" t. ! queue ! appsink name=debugtap emit-signals=true max-buffers=500 drop=true "
) ,
dev = dev ,
enc = enc
2025-06-30 01:19:28 -05:00
) )
2025-06-30 14:20:07 -05:00
}
2026-04-21 13:31:49 -03:00
#[ cfg(not(coverage)) ]
fn ensure_remote_usb_audio_ready ( alsa_dev : & str ) -> anyhow ::Result < ( ) > {
if ! alsa_dev_uses_remote_uac_gadget ( alsa_dev ) {
return Ok ( ( ) ) ;
}
let Some ( ( controller , state ) ) = current_usb_gadget_state ( ) ? else {
return Ok ( ( ) ) ;
} ;
if state = = " not attached " {
return Err ( anyhow! (
" remote USB gadget is not attached (UDC {controller} state={state}); remote speaker audio cannot stream until the controlled PC enumerates Lesavka USB "
) ) ;
}
Ok ( ( ) )
}
#[ cfg(not(coverage)) ]
fn alsa_dev_uses_remote_uac_gadget ( alsa_dev : & str ) -> bool {
matches! ( alsa_dev , " hw:UAC2Gadget,0 " | " hw:UAC2_Gadget,0 " )
| | alsa_dev . contains ( " UAC2Gadget " )
| | alsa_dev . contains ( " UAC2_Gadget " )
}
#[ cfg(not(coverage)) ]
fn current_usb_gadget_state ( ) -> anyhow ::Result < Option < ( String , String ) > > {
let configfs_root = std ::env ::var ( " LESAVKA_GADGET_CONFIGFS_ROOT " )
. unwrap_or_else ( | _ | " /sys/kernel/config/usb_gadget " . to_string ( ) ) ;
let sysfs_root = std ::env ::var ( " LESAVKA_GADGET_SYSFS_ROOT " ) . unwrap_or_else ( | _ | " /sys " . into ( ) ) ;
let udc = fs ::read_to_string ( format! ( " {configfs_root} /lesavka/UDC " ) )
. ok ( )
. map ( | value | value . trim ( ) . to_string ( ) )
. filter ( | value | ! value . is_empty ( ) ) ;
let Some ( controller ) = udc else {
return Ok ( None ) ;
} ;
let state = fs ::read_to_string ( format! ( " {sysfs_root} /class/udc/ {controller} /state " ) )
. with_context ( | | format! ( " reading UDC state for {controller} " ) ) ?
. trim ( )
. to_string ( ) ;
Ok ( Some ( ( controller , state ) ) )
}
2026-04-21 12:06:40 -03:00
#[ cfg(not(coverage)) ]
struct AudioSourceHealth {
started_at : Instant ,
last_sample_at : Mutex < Instant > ,
packets : AtomicU64 ,
closed : AtomicBool ,
}
#[ cfg(not(coverage)) ]
impl AudioSourceHealth {
fn new ( ) -> Self {
let now = Instant ::now ( ) ;
Self {
started_at : now ,
last_sample_at : Mutex ::new ( now ) ,
packets : AtomicU64 ::new ( 0 ) ,
closed : AtomicBool ::new ( false ) ,
}
}
fn mark_sample ( & self ) {
self . packets . fetch_add ( 1 , Ordering ::Relaxed ) ;
if let Ok ( mut last ) = self . last_sample_at . lock ( ) {
* last = Instant ::now ( ) ;
}
}
fn is_closed ( & self ) -> bool {
self . closed . load ( Ordering ::Relaxed )
}
fn signal_failure ( & self ) -> bool {
! self . closed . swap ( true , Ordering ::Relaxed )
}
fn elapsed ( & self ) -> Duration {
self . started_at . elapsed ( )
}
fn idle_for ( & self ) -> Duration {
self . last_sample_at
. lock ( )
. map ( | last | last . elapsed ( ) )
. unwrap_or_else ( | _ | Duration ::from_secs ( 0 ) )
}
fn packets ( & self ) -> u64 {
self . packets . load ( Ordering ::Relaxed )
}
}
#[ cfg(not(coverage)) ]
#[ derive(Clone, Copy) ]
struct AudioWatchdogPolicy {
startup_grace : Duration ,
idle_timeout : Duration ,
min_packets_per_second : u64 ,
}
#[ cfg(not(coverage)) ]
impl AudioWatchdogPolicy {
fn from_env ( ) -> Self {
Self {
startup_grace : env_duration_ms ( " LESAVKA_AUDIO_SOURCE_GRACE_MS " , 3_000 ) ,
idle_timeout : env_duration_ms ( " LESAVKA_AUDIO_SOURCE_IDLE_MS " , 1_500 ) ,
min_packets_per_second : env_u64 ( " LESAVKA_AUDIO_MIN_PACKETS_PER_SEC " , 20 ) ,
}
}
}
#[ cfg(not(coverage)) ]
fn env_duration_ms ( name : & str , default_ms : u64 ) -> Duration {
Duration ::from_millis ( env_u64 ( name , default_ms ) )
}
#[ cfg(not(coverage)) ]
fn env_u64 ( name : & str , default : u64 ) -> u64 {
std ::env ::var ( name )
. ok ( )
. and_then ( | value | value . parse ::< u64 > ( ) . ok ( ) )
. filter ( | value | * value > 0 )
. unwrap_or ( default )
}
/// Watch the remote speaker capture source and fail fast when the USB audio
/// gadget is open but not producing real-time packets.
#[ cfg(not(coverage)) ]
fn spawn_audio_source_watchdog (
pipeline : gst ::Pipeline ,
health : Arc < AudioSourceHealth > ,
tx : tokio ::sync ::mpsc ::Sender < Result < AudioPacket , Status > > ,
alsa_dev : String ,
) {
let policy = AudioWatchdogPolicy ::from_env ( ) ;
std ::thread ::spawn ( move | | {
loop {
std ::thread ::sleep ( Duration ::from_millis ( 250 ) ) ;
if health . is_closed ( ) {
break ;
}
let elapsed = health . elapsed ( ) ;
if elapsed < policy . startup_grace {
continue ;
}
let packets = health . packets ( ) ;
let idle_for = health . idle_for ( ) ;
let rate = packets as f64 / elapsed . as_secs_f64 ( ) . max ( 0.001 ) ;
let failure = if packets = = 0 {
Some ( format! (
" remote speaker capture produced no audio samples after {} ms on {alsa_dev} " ,
elapsed . as_millis ( )
) )
} else if idle_for > = policy . idle_timeout {
Some ( format! (
" remote speaker capture stalled for {} ms on {alsa_dev} " ,
idle_for . as_millis ( )
) )
} else if ( packets / elapsed . as_secs ( ) . max ( 1 ) ) < policy . min_packets_per_second {
Some ( format! (
" remote speaker capture cadence is too low on {alsa_dev}: {rate:.1} packets/s, expected at least {} packets/s " ,
policy . min_packets_per_second
) )
} else {
None
} ;
if let Some ( message ) = failure {
if health . signal_failure ( ) {
warn! ( " 🔊🛟 {message}; restarting audio capture on next client reconnect " ) ;
let _ = pipeline . set_state ( gst ::State ::Null ) ;
let _ = tx . blocking_send ( Err ( Status ::unavailable ( message ) ) ) ;
}
break ;
}
}
} ) ;
}
2025-07-01 17:30:34 -05:00
// ────────────────────── minute‑ clip helper ───────────────────────────────
pub struct ClipTap {
2025-11-30 16:16:03 -03:00
buf : Vec < u8 > ,
tag : & 'static str ,
2025-07-01 17:30:34 -05:00
next_dump : Instant ,
period : Duration ,
}
2025-06-30 14:20:07 -05:00
2025-07-01 17:30:34 -05:00
impl ClipTap {
pub fn new ( tag : & 'static str , period : Duration ) -> Self {
Self {
buf : Vec ::with_capacity ( 260_000 ) ,
tag ,
next_dump : Instant ::now ( ) + period ,
period ,
2025-06-30 14:20:07 -05:00
}
2025-07-01 17:30:34 -05:00
}
pub fn feed ( & mut self , bytes : & [ u8 ] ) {
self . buf . extend_from_slice ( bytes ) ;
if self . buf . len ( ) > 256_000 {
self . buf . drain ( .. self . buf . len ( ) - 256_000 ) ;
}
if Instant ::now ( ) > = self . next_dump {
self . flush ( ) ;
self . next_dump + = self . period ;
}
}
pub fn flush ( & mut self ) {
if self . buf . is_empty ( ) {
return ;
}
let ts = chrono ::Local ::now ( ) . format ( " %Y%m%d-%H%M%S " ) ;
let path = format! ( " /tmp/ {} - {} .aac " , self . tag , ts ) ;
2026-04-13 02:52:32 -03:00
let _ = std ::fs ::write ( & path , & self . buf ) ;
2025-07-01 17:30:34 -05:00
self . buf . clear ( ) ;
}
}
impl Drop for ClipTap {
fn drop ( & mut self ) {
self . flush ( )
}
}
// ────────────────────── microphone sink ────────────────────────────────
pub struct Voice {
appsrc : gst_app ::AppSrc ,
2025-11-30 16:16:03 -03:00
_pipe : gst ::Pipeline , // keep pipeline alive
tap : ClipTap ,
2025-07-01 17:30:34 -05:00
}
2025-06-30 14:20:07 -05:00
2025-07-01 17:30:34 -05:00
impl Voice {
2026-04-13 02:52:32 -03:00
#[ cfg(coverage) ]
pub async fn new ( _alsa_dev : & str ) -> anyhow ::Result < Self > {
gst ::init ( ) . context ( " gst init " ) ? ;
let pipeline = gst ::Pipeline ::new ( ) ;
let appsrc = gst ::ElementFactory ::make ( " appsrc " )
. build ( )
. context ( " make appsrc " ) ?
. downcast ::< gst_app ::AppSrc > ( )
. expect ( " appsrc " ) ;
appsrc . set_format ( gst ::Format ::Time ) ;
appsrc . set_is_live ( true ) ;
let sink = gst ::ElementFactory ::make ( " fakesink " )
. build ( )
. context ( " make fakesink " ) ? ;
pipeline . add_many ( & [ appsrc . upcast_ref ( ) , & sink ] ) ? ;
gst ::Element ::link_many ( & [ appsrc . upcast_ref ( ) , & sink ] ) ? ;
pipeline . set_state ( gst ::State ::Playing ) ? ;
Ok ( Self {
appsrc ,
_pipe : pipeline ,
tap : ClipTap ::new ( " voice " , Duration ::from_secs ( 60 ) ) ,
} )
}
#[ cfg(not(coverage)) ]
2025-07-01 17:30:34 -05:00
pub async fn new ( alsa_dev : & str ) -> anyhow ::Result < Self > {
use gst ::prelude ::* ;
gst ::init ( ) . context ( " gst init " ) ? ;
// pipeline
let pipeline = gst ::Pipeline ::new ( ) ;
// elements
2025-11-30 16:16:03 -03:00
let appsrc = gst ::ElementFactory ::make ( " appsrc " )
2025-07-01 17:30:34 -05:00
. build ( )
. context ( " make appsrc " ) ?
. downcast ::< gst_app ::AppSrc > ( )
2025-06-30 14:20:07 -05:00
. unwrap ( ) ;
2025-07-01 17:30:34 -05:00
// dedicated AppSrc helpers exist and avoid the needless `?`
appsrc . set_format ( gst ::Format ::Time ) ;
appsrc . set_is_live ( true ) ;
2025-11-30 16:16:03 -03:00
let decodebin = gst ::ElementFactory ::make ( " decodebin " )
2025-07-01 17:30:34 -05:00
. build ( )
. context ( " make decodebin " ) ? ;
2026-04-08 20:00:14 -03:00
let convert = gst ::ElementFactory ::make ( " audioconvert " )
. build ( )
. context ( " make audioconvert " ) ? ;
let resample = gst ::ElementFactory ::make ( " audioresample " )
. build ( )
. context ( " make audioresample " ) ? ;
let caps = gst ::Caps ::builder ( " audio/x-raw " )
. field ( " format " , " S16LE " )
. field ( " channels " , 2 i32 )
. field ( " rate " , 48_000 i32 )
. build ( ) ;
let capsfilter = gst ::ElementFactory ::make ( " capsfilter " )
. property ( " caps " , & caps )
. build ( )
. context ( " make capsfilter " ) ? ;
2025-11-30 16:16:03 -03:00
let alsa_sink = gst ::ElementFactory ::make ( " alsasink " )
2025-07-01 17:30:34 -05:00
. build ( )
. context ( " make alsasink " ) ? ;
alsa_sink . set_property ( " device " , & alsa_dev ) ;
2026-04-20 12:12:29 -03:00
alsa_sink . set_property ( " sync " , false ) ;
alsa_sink . set_property ( " async " , false ) ;
alsa_sink . set_property ( " enable-last-sample " , false ) ;
2025-07-01 17:30:34 -05:00
2026-04-08 20:00:14 -03:00
pipeline . add_many ( & [
appsrc . upcast_ref ( ) ,
& decodebin ,
& convert ,
& resample ,
& capsfilter ,
& alsa_sink ,
] ) ? ;
2025-07-01 17:30:34 -05:00
appsrc . link ( & decodebin ) ? ;
2026-04-08 20:00:14 -03:00
gst ::Element ::link_many ( & [ & convert , & resample , & capsfilter , & alsa_sink ] ) ? ;
2025-07-01 17:30:34 -05:00
/* ------------ decodebin autolink ---------------- */
2026-04-08 20:00:14 -03:00
let convert_sink = convert
. static_pad ( " sink " )
. context ( " audioconvert sink pad " ) ? ;
2025-07-01 17:30:34 -05:00
decodebin . connect_pad_added ( move | _db , pad | {
2026-04-08 20:00:14 -03:00
if convert_sink . is_linked ( ) {
return ;
2025-07-01 17:30:34 -05:00
}
2026-04-08 20:00:14 -03:00
let caps = pad . current_caps ( ) . unwrap_or_else ( | | pad . query_caps ( None ) ) ;
let is_audio = caps
. structure ( 0 )
. map ( | s | s . name ( ) . starts_with ( " audio/ " ) )
. unwrap_or ( false ) ;
if ! is_audio {
return ;
}
let _ = pad . link ( & convert_sink ) ;
2025-07-01 17:30:34 -05:00
} ) ;
// underrun ≠ error – just show a warning
2025-07-01 18:21:06 -05:00
// let _id = alsa_sink.connect("underrun", false, |_| {
// tracing::warn!("⚠️ USB playback underrun – host muted/not reading");
// None
// });
2025-07-01 17:30:34 -05:00
pipeline . set_state ( gst ::State ::Playing ) ? ;
Ok ( Self {
appsrc ,
_pipe : pipeline ,
tap : ClipTap ::new ( " voice " , Duration ::from_secs ( 60 ) ) ,
} )
}
pub fn push ( & mut self , pkt : & AudioPacket ) {
self . tap . feed ( & pkt . data ) ;
let mut buf = gst ::Buffer ::from_slice ( pkt . data . clone ( ) ) ;
buf . get_mut ( )
. unwrap ( )
. set_pts ( Some ( gst ::ClockTime ::from_useconds ( pkt . pts ) ) ) ;
2026-04-13 02:52:32 -03:00
let _ = self . appsrc . push_buffer ( buf ) ;
2025-07-01 17:30:34 -05:00
}
pub fn finish ( & mut self ) {
self . tap . flush ( ) ;
let _ = self . appsrc . end_of_stream ( ) ;
2025-06-30 14:20:07 -05:00
}
2025-07-01 17:30:34 -05:00
}
2026-04-15 04:11:47 -03:00
#[ cfg(all(test, coverage)) ]
mod tests {
use super ::Voice ;
#[ tokio::test ]
async fn coverage_voice_constructor_starts_stub_pipeline ( ) {
let mut voice = Voice ::new ( " coverage-audio " ) . await . expect ( " voice " ) ;
voice . finish ( ) ;
}
}
2026-04-21 13:31:49 -03:00
#[ cfg(all(test, not(coverage))) ]
mod tests {
2026-04-21 20:19:47 -03:00
use super ::{ build_pipeline_desc , ensure_remote_usb_audio_ready } ;
2026-04-21 13:31:49 -03:00
use temp_env ::with_vars ;
use tempfile ::tempdir ;
2026-04-21 20:19:47 -03:00
#[ test ]
fn speaker_downlink_pipeline_keeps_aac_adts_transport_and_level_probe ( ) {
let _ = super ::gst ::init ( ) ;
let result = build_pipeline_desc ( " hw:Loopback,0 " ) ;
match result {
Ok ( desc ) = > {
assert! ( desc . contains ( " alsasrc device= \" hw:Loopback,0 \" " ) ) ;
assert! ( desc . contains ( " audio/x-raw,format=S16LE,channels=2,rate=48000 " ) ) ;
assert! ( desc . contains ( " aacparse " ) ) ;
assert! ( desc . contains ( " stream-format=adts " ) ) ;
assert! ( desc . contains ( " level name=source_level " ) ) ;
assert! ( desc . contains ( " appsink name=asink " ) ) ;
}
Err ( err ) = > {
assert! (
err . to_string ( ) . contains ( " no AAC encoder plugin available " ) ,
" unexpected build failure: {err:#} "
) ;
}
}
}
2026-04-21 13:31:49 -03:00
#[ test ]
fn remote_usb_audio_reports_not_attached_gadget ( ) {
let dir = tempdir ( ) . expect ( " tempdir " ) ;
let cfg_root = dir . path ( ) . join ( " cfg " ) ;
let sys_root = dir . path ( ) . join ( " sys " ) ;
let udc_dir = sys_root . join ( " class/udc/fake-ctrl.usb " ) ;
std ::fs ::create_dir_all ( cfg_root . join ( " lesavka " ) ) . expect ( " cfg " ) ;
std ::fs ::create_dir_all ( & udc_dir ) . expect ( " udc " ) ;
std ::fs ::write ( cfg_root . join ( " lesavka/UDC " ) , " fake-ctrl.usb \n " ) . expect ( " udc file " ) ;
std ::fs ::write ( udc_dir . join ( " state " ) , " not attached \n " ) . expect ( " state " ) ;
with_vars (
[
(
" LESAVKA_GADGET_CONFIGFS_ROOT " ,
Some ( cfg_root . to_string_lossy ( ) . to_string ( ) ) ,
) ,
(
" LESAVKA_GADGET_SYSFS_ROOT " ,
Some ( sys_root . to_string_lossy ( ) . to_string ( ) ) ,
) ,
] ,
| | {
let err = ensure_remote_usb_audio_ready ( " hw:UAC2Gadget,0 " )
. expect_err ( " not attached gadget should block remote speaker audio " ) ;
assert! (
err . to_string ( )
. contains ( " remote USB gadget is not attached " )
) ;
} ,
) ;
}
#[ test ]
fn remote_usb_audio_allows_non_gadget_override ( ) {
ensure_remote_usb_audio_ready ( " hw:Loopback,0 " ) . expect ( " non-gadget override " ) ;
}
}