use anyhow::{Context, Result, anyhow}; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use gtk::{gdk, glib}; use shell_escape::escape; use std::borrow::Cow; use std::fs; use std::process::{Child, Command}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; const CAMERA_PREVIEW_WIDTH: i32 = 360; const CAMERA_PREVIEW_HEIGHT: i32 = 202; const CAMERA_PREVIEW_IDLE: &str = "Select a webcam and click Start Preview."; const MIC_MONITOR_RATE: i32 = 16_000; const MIC_MONITOR_CHANNELS: i32 = 1; const MIC_MONITOR_SAMPLE_BYTES: usize = 2; const MIC_REPLAY_SECONDS: usize = 3; const MIC_REPLAY_PATH: &str = "/tmp/lesavka-mic-replay.wav"; const MIC_REPLAY_MAX_BYTES: usize = MIC_MONITOR_RATE as usize * MIC_MONITOR_CHANNELS as usize * MIC_MONITOR_SAMPLE_BYTES * MIC_REPLAY_SECONDS; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum DeviceTestKind { Camera, Microphone, MicrophoneReplay, Speaker, } pub struct DeviceTestController { camera: Option, selected_camera: Option, microphone: Option, speaker: Option, microphone_replay: Option, microphone_buffer: Arc>>, microphone_level: Arc>, } impl Default for DeviceTestController { fn default() -> Self { Self { camera: None, selected_camera: None, microphone: None, speaker: None, microphone_replay: None, microphone_buffer: Arc::new(Mutex::new(Vec::new())), microphone_level: Arc::new(Mutex::new(0.0)), } } } impl DeviceTestController { pub fn new() -> Self { Self::default() } pub fn bind_camera_preview( &mut self, camera_picture: >k::Picture, camera_status: >k::Label, ) -> Result<()> { if let Some(camera) = self.camera.as_mut() { camera.stop(); } let mut preview = LocalCameraPreview::new(camera_picture, camera_status); preview.set_selected(self.selected_camera.as_deref())?; self.camera = Some(preview); Ok(()) } pub fn is_running(&mut self, kind: DeviceTestKind) -> bool { self.cleanup_finished(); match kind { DeviceTestKind::Camera => self .camera .as_ref() .is_some_and(LocalCameraPreview::is_running), DeviceTestKind::Microphone => self .microphone .as_ref() .is_some_and(LocalMicrophoneMonitor::is_running), DeviceTestKind::MicrophoneReplay => self.microphone_replay.is_some(), DeviceTestKind::Speaker => self.speaker.is_some(), } } pub fn set_camera_selection(&mut self, camera: Option<&str>) -> Result<()> { self.selected_camera = normalize_camera_selection(camera); if let Some(preview) = self.camera.as_mut() { preview.set_selected(self.selected_camera.as_deref())?; } Ok(()) } pub fn toggle_camera(&mut self) -> Result { let preview = self .camera .as_mut() .ok_or_else(|| anyhow!("camera preview panel is not ready yet"))?; preview.toggle() } pub fn toggle_microphone(&mut self, source: Option<&str>, sink: Option<&str>) -> Result { self.cleanup_finished(); if self.microphone.is_some() { self.stop(DeviceTestKind::Microphone); return Ok(false); } let monitor = LocalMicrophoneMonitor::start( source, sink, Arc::clone(&self.microphone_buffer), Arc::clone(&self.microphone_level), )?; self.microphone = Some(monitor); Ok(true) } pub fn toggle_speaker(&mut self, sink: Option<&str>) -> Result { self.toggle_child(DeviceTestKind::Speaker, build_speaker_test(sink)) } pub fn toggle_microphone_replay(&mut self, sink: Option<&str>) -> Result { self.cleanup_finished(); if self.microphone_replay.is_some() { self.stop(DeviceTestKind::MicrophoneReplay); return Ok(false); } let wav_bytes = self.replay_wav_bytes()?; fs::write(MIC_REPLAY_PATH, wav_bytes).context("writing microphone replay clip")?; let child = build_microphone_replay_test(MIC_REPLAY_PATH, sink)? .spawn() .context("starting microphone replay")?; self.microphone_replay = Some(child); Ok(true) } pub fn microphone_level_fraction(&mut self) -> f64 { self.cleanup_finished(); self.microphone_level .lock() .map(|value| (*value).clamp(0.0, 1.0)) .unwrap_or(0.0) } pub fn microphone_replay_ready(&mut self) -> bool { self.cleanup_finished(); self.microphone_buffer .lock() .map(|buffer| !buffer.is_empty()) .unwrap_or(false) } pub fn stop_all(&mut self) { if let Some(camera) = self.camera.as_mut() { camera.stop(); } for kind in [ DeviceTestKind::Microphone, DeviceTestKind::MicrophoneReplay, DeviceTestKind::Speaker, ] { self.stop(kind); } } fn toggle_child(&mut self, kind: DeviceTestKind, command: Result) -> Result { self.cleanup_finished(); if self.slot(kind).is_some() { self.stop(kind); return Ok(false); } let child = command? .spawn() .with_context(|| format!("starting {kind:?} test"))?; *self.slot_mut(kind) = Some(child); Ok(true) } fn stop(&mut self, kind: DeviceTestKind) { match kind { DeviceTestKind::Camera => panic!("camera preview is not stopped through this path"), DeviceTestKind::Microphone => { if let Some(mut monitor) = self.microphone.take() { monitor.stop(); } if let Ok(mut level) = self.microphone_level.lock() { *level = 0.0; } } DeviceTestKind::MicrophoneReplay | DeviceTestKind::Speaker => { if let Some(mut child) = self.slot_mut(kind).take() { let _ = child.kill(); let _ = child.wait(); } } } } fn cleanup_finished(&mut self) { if self .microphone .as_mut() .is_some_and(|monitor| !monitor.is_running()) { self.microphone = None; } for kind in [DeviceTestKind::MicrophoneReplay, DeviceTestKind::Speaker] { let finished = self .slot_mut(kind) .as_mut() .and_then(|child| child.try_wait().ok()) .flatten() .is_some(); if finished { let _ = self.slot_mut(kind).take(); } } } fn slot(&self, kind: DeviceTestKind) -> &Option { match kind { DeviceTestKind::Camera | DeviceTestKind::Microphone => { panic!("this device test is not an external child process") } DeviceTestKind::MicrophoneReplay => &self.microphone_replay, DeviceTestKind::Speaker => &self.speaker, } } fn slot_mut(&mut self, kind: DeviceTestKind) -> &mut Option { match kind { DeviceTestKind::Camera | DeviceTestKind::Microphone => { panic!("this device test is not an external child process") } DeviceTestKind::MicrophoneReplay => &mut self.microphone_replay, DeviceTestKind::Speaker => &mut self.speaker, } } fn replay_wav_bytes(&self) -> Result> { let audio = self .microphone_buffer .lock() .map_err(|_| anyhow!("microphone replay buffer is unavailable right now"))? .clone(); if audio.is_empty() { return Err(anyhow!( "Monitor Mic long enough to capture audio before replaying the last 3 seconds." )); } Ok(build_wav_bytes( &audio, MIC_MONITOR_RATE as u32, MIC_MONITOR_CHANNELS as u16, 16, )) } } struct LocalCameraPreview { latest: Arc>>, status_text: Arc>, generation: Arc, running: Arc, selected_device: Option, } struct LocalMicrophoneMonitor { running: Arc, generation: Arc, } struct PreviewFrame { width: i32, height: i32, stride: usize, rgba: Vec, } impl LocalCameraPreview { fn new(picture: >k::Picture, status_label: >k::Label) -> Self { let latest = Arc::new(Mutex::new(None::)); let status_text = Arc::new(Mutex::new(CAMERA_PREVIEW_IDLE.to_string())); let generation = Arc::new(AtomicU64::new(0)); let running = Arc::new(AtomicBool::new(false)); picture.set_paintable(Some(&blank_camera_preview_texture())); { let picture = picture.clone(); let status_label = status_label.clone(); let latest = Arc::clone(&latest); let status_text = Arc::clone(&status_text); glib::timeout_add_local(Duration::from_millis(120), move || { let next = latest.lock().ok().and_then(|mut slot| slot.take()); if let Some(frame) = next { let bytes = glib::Bytes::from_owned(frame.rgba); let texture = gdk::MemoryTexture::new( frame.width, frame.height, gdk::MemoryFormat::R8g8b8a8, &bytes, frame.stride, ); picture.set_paintable(Some(&texture)); } if let Ok(text) = status_text.lock() { status_label.set_text(text.as_str()); } glib::ControlFlow::Continue }); } Self { latest, status_text, generation, running, selected_device: None, } } fn is_running(&self) -> bool { self.running.load(Ordering::Acquire) } fn set_selected(&mut self, camera: Option<&str>) -> Result<()> { self.selected_device = normalize_camera_selection(camera); if self.is_running() { self.stop(); self.start()?; return Ok(()); } self.set_status(match self.selected_device.as_deref() { Some(camera) => format!( "Selected {camera}. Start Preview to confirm webcam framing here before you launch the relay." ), None => CAMERA_PREVIEW_IDLE.to_string(), }); Ok(()) } fn toggle(&mut self) -> Result { if self.is_running() { self.stop(); return Ok(false); } self.start()?; Ok(true) } fn start(&mut self) -> Result<()> { gst::init().context("initialising in-launcher camera preview")?; let selected = self .selected_device .clone() .ok_or_else(|| anyhow!("select a camera before starting the in-launcher preview"))?; let device = resolve_camera_device(&selected); let latest = Arc::clone(&self.latest); let status_text = Arc::clone(&self.status_text); let generation = Arc::clone(&self.generation); let running = Arc::clone(&self.running); let token = generation.fetch_add(1, Ordering::AcqRel) + 1; running.store(true, Ordering::Release); self.set_status(format!("Starting local preview for {selected}...")); std::thread::spawn(move || { if let Err(err) = run_camera_preview_feed( selected, device, token, latest, status_text.clone(), generation.clone(), running.clone(), ) && generation.load(Ordering::Acquire) == token { running.store(false, Ordering::Release); if let Ok(mut status) = status_text.lock() { *status = format!("Camera preview failed: {err}"); } } }); Ok(()) } fn stop(&mut self) { self.running.store(false, Ordering::Release); self.generation.fetch_add(1, Ordering::AcqRel); if let Ok(mut latest) = self.latest.lock() { *latest = None; } self.set_status(match self.selected_device.as_deref() { Some(camera) => { format!("Local preview stopped. {camera} stays selected for the next relay launch.") } None => CAMERA_PREVIEW_IDLE.to_string(), }); } fn set_status(&self, text: String) { if let Ok(mut status) = self.status_text.lock() { *status = text; } } } fn blank_camera_preview_texture() -> gdk::MemoryTexture { let rgba = vec![12_u8; (CAMERA_PREVIEW_WIDTH * CAMERA_PREVIEW_HEIGHT * 4) as usize]; let bytes = glib::Bytes::from_owned(rgba); gdk::MemoryTexture::new( CAMERA_PREVIEW_WIDTH, CAMERA_PREVIEW_HEIGHT, gdk::MemoryFormat::R8g8b8a8, &bytes, (CAMERA_PREVIEW_WIDTH * 4) as usize, ) } impl LocalMicrophoneMonitor { fn start( source: Option<&str>, sink: Option<&str>, recent_audio: Arc>>, level: Arc>, ) -> Result { gst::init().context("initialising microphone preview")?; let source = source .filter(|value| !value.trim().is_empty()) .ok_or_else(|| anyhow!("select a microphone before starting Monitor Mic"))? .to_string(); let sink = sink .filter(|value| !value.trim().is_empty()) .map(ToOwned::to_owned); if let Ok(mut buffer) = recent_audio.lock() { buffer.clear(); } if let Ok(mut meter) = level.lock() { *meter = 0.0; } let running = Arc::new(AtomicBool::new(true)); let generation = Arc::new(AtomicU64::new(1)); let running_handle = Arc::clone(&running); let generation_handle = Arc::clone(&generation); let token = generation.load(Ordering::Acquire); std::thread::spawn(move || { let _ = run_microphone_monitor_feed( &source, sink.as_deref(), token, recent_audio, level, generation_handle, running_handle, ); }); Ok(Self { running, generation, }) } fn is_running(&self) -> bool { self.running.load(Ordering::Acquire) } fn stop(&mut self) { self.running.store(false, Ordering::Release); self.generation.fetch_add(1, Ordering::AcqRel); } } fn normalize_camera_selection(camera: Option<&str>) -> Option { camera .map(str::trim) .filter(|value| !value.is_empty() && !value.eq_ignore_ascii_case("auto")) .map(ToOwned::to_owned) } fn resolve_camera_device(camera: &str) -> String { if camera.starts_with("/dev/") { camera.to_string() } else { format!("/dev/v4l/by-id/{camera}") } } fn run_microphone_monitor_feed( source: &str, sink: Option<&str>, token: u64, recent_audio: Arc>>, level: Arc>, generation: Arc, running: Arc, ) -> Result<()> { let (pipeline, appsink) = build_microphone_monitor_pipeline(source, sink)?; pipeline .set_state(gst::State::Playing) .context("starting microphone preview pipeline")?; while running.load(Ordering::Acquire) && generation.load(Ordering::Acquire) == token { if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) { if let Some(buffer) = sample.buffer() && let Ok(map) = buffer.map_readable() { let bytes = map.as_slice(); push_recent_audio(&recent_audio, bytes); update_microphone_level(&level, bytes); } } else if let Ok(mut meter) = level.lock() { *meter = (*meter * 0.8).clamp(0.0, 1.0); } } let _ = pipeline.set_state(gst::State::Null); if let Ok(mut meter) = level.lock() { *meter = 0.0; } running.store(false, Ordering::Release); Ok(()) } fn run_camera_preview_feed( selected: String, device: String, token: u64, latest: Arc>>, status_text: Arc>, generation: Arc, running: Arc, ) -> Result<()> { let (pipeline, appsink) = build_camera_preview_pipeline(&device)?; pipeline .set_state(gst::State::Playing) .context("starting in-launcher camera preview pipeline")?; if let Ok(mut status) = status_text.lock() { *status = format!("Local preview live for {selected}."); } while running.load(Ordering::Acquire) && generation.load(Ordering::Acquire) == token { if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) && let Some(frame) = sample_to_frame(&sample) && let Ok(mut slot) = latest.lock() { *slot = Some(frame); } } let _ = pipeline.set_state(gst::State::Null); Ok(()) } fn build_camera_preview_pipeline(device: &str) -> Result<(gst::Pipeline, gst_app::AppSink)> { let desc = camera_preview_pipeline_desc(device); let pipeline = gst::parse::launch(&desc)? .downcast::() .expect("camera preview pipeline"); let appsink = pipeline .by_name("sink") .context("missing in-launcher camera preview appsink")? .downcast::() .expect("camera preview appsink"); appsink.set_caps(Some( &gst::Caps::builder("video/x-raw") .field("format", "RGBA") .field("width", CAMERA_PREVIEW_WIDTH) .field("height", CAMERA_PREVIEW_HEIGHT) .build(), )); Ok((pipeline, appsink)) } fn build_microphone_monitor_pipeline( source: &str, sink: Option<&str>, ) -> Result<(gst::Pipeline, gst_app::AppSink)> { let desc = microphone_monitor_pipeline_desc(source, sink); let pipeline = gst::parse::launch(&desc)? .downcast::() .expect("microphone monitor pipeline"); let appsink = pipeline .by_name("mic_preview_sink") .context("missing microphone preview appsink")? .downcast::() .expect("microphone preview appsink"); appsink.set_caps(Some( &gst::Caps::builder("audio/x-raw") .field("format", "S16LE") .field("rate", MIC_MONITOR_RATE) .field("channels", MIC_MONITOR_CHANNELS) .build(), )); Ok((pipeline, appsink)) } fn camera_preview_pipeline_desc(device: &str) -> String { let device = gst_quote(device); format!( "v4l2src device=\"{device}\" do-timestamp=true ! \ videoconvert ! videoscale ! videorate ! \ video/x-raw,format=RGBA,width={CAMERA_PREVIEW_WIDTH},height={CAMERA_PREVIEW_HEIGHT},framerate=30/1,pixel-aspect-ratio=1/1 ! \ appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true" ) } fn microphone_monitor_pipeline_desc(source: &str, sink: Option<&str>) -> String { let source_element = if gst::ElementFactory::find("pipewiresrc").is_some() { let source = gst_quote(source); format!("pipewiresrc target-object=\"{source}\" do-timestamp=true") } else { let source = gst_quote(source); format!("pulsesrc device=\"{source}\" do-timestamp=true") }; let sink_prop = sink .map(gst_quote) .map(|value| format!(" device=\"{value}\"")) .unwrap_or_default(); format!( "{source_element} ! \ audioconvert ! audioresample ! \ audio/x-raw,format=S16LE,rate={MIC_MONITOR_RATE},channels={MIC_MONITOR_CHANNELS} ! \ tee name=t \ t. ! queue ! pulsesink{sink_prop} \ t. ! queue ! appsink name=mic_preview_sink emit-signals=false sync=false max-buffers=8 drop=true" ) } fn sample_to_frame(sample: &gst::Sample) -> Option { let caps = sample.caps()?; let structure = caps.structure(0)?; let width = structure.get::("width").ok()?; let height = structure.get::("height").ok()?; let buffer = sample.buffer()?; let map = buffer.map_readable().ok()?; let rgba = map.as_slice().to_vec(); let stride = rgba.len() / height.max(1) as usize; Some(PreviewFrame { width, height, stride, rgba, }) } fn gst_quote(value: &str) -> String { value.replace('\\', "\\\\").replace('"', "\\\"") } fn build_speaker_test(sink: Option<&str>) -> Result { let sink_prop = sink .filter(|value| !value.trim().is_empty()) .map(|value| format!("device={}", quote(value))) .unwrap_or_default(); Ok(shell_command(format!( "gst-launch-1.0 -q audiotestsrc is-live=true wave=sine freq=880 volume=0.25 ! audioconvert ! audioresample ! queue ! pulsesink {}", sink_prop ))) } fn build_microphone_replay_test(path: &str, sink: Option<&str>) -> Result { let sink_prop = sink .filter(|value| !value.trim().is_empty()) .map(|value| format!("device={}", quote(value))) .unwrap_or_default(); Ok(shell_command(format!( "gst-launch-1.0 -q filesrc location={} ! wavparse ! audioconvert ! audioresample ! queue ! pulsesink {}", quote(path), sink_prop ))) } fn shell_command(command: String) -> Command { let mut child = Command::new("bash"); child.args(["-lc", &command]); child } fn quote(value: impl Into) -> String { escape(Cow::Owned(value.into())).into_owned() } fn push_recent_audio(buffer: &Arc>>, bytes: &[u8]) { if let Ok(mut ring) = buffer.lock() { ring.extend_from_slice(bytes); if ring.len() > MIC_REPLAY_MAX_BYTES { let overflow = ring.len() - MIC_REPLAY_MAX_BYTES; ring.drain(0..overflow); } } } fn update_microphone_level(level: &Arc>, bytes: &[u8]) { let peak = bytes .chunks_exact(2) .map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]]).unsigned_abs() as f64) .fold(0.0, f64::max) / i16::MAX as f64; if let Ok(mut meter) = level.lock() { *meter = peak.clamp(0.0, 1.0); } } fn build_wav_bytes(audio: &[u8], sample_rate: u32, channels: u16, bits_per_sample: u16) -> Vec { let block_align = channels * (bits_per_sample / 8); let byte_rate = sample_rate * block_align as u32; let data_len = audio.len() as u32; let riff_len = 36 + data_len; let mut wav = Vec::with_capacity(44 + audio.len()); wav.extend_from_slice(b"RIFF"); wav.extend_from_slice(&riff_len.to_le_bytes()); wav.extend_from_slice(b"WAVE"); wav.extend_from_slice(b"fmt "); wav.extend_from_slice(&16u32.to_le_bytes()); wav.extend_from_slice(&1u16.to_le_bytes()); wav.extend_from_slice(&channels.to_le_bytes()); wav.extend_from_slice(&sample_rate.to_le_bytes()); wav.extend_from_slice(&byte_rate.to_le_bytes()); wav.extend_from_slice(&block_align.to_le_bytes()); wav.extend_from_slice(&bits_per_sample.to_le_bytes()); wav.extend_from_slice(b"data"); wav.extend_from_slice(&data_len.to_le_bytes()); wav.extend_from_slice(audio); wav } #[cfg(test)] mod tests { use super::{ MIC_REPLAY_MAX_BYTES, build_wav_bytes, camera_preview_pipeline_desc, normalize_camera_selection, push_recent_audio, resolve_camera_device, }; use std::sync::{Arc, Mutex}; #[test] fn resolve_camera_device_accepts_explicit_paths_and_catalog_names() { assert_eq!(resolve_camera_device("/dev/video0"), "/dev/video0"); assert_eq!( resolve_camera_device("usb-Logitech_C920-video-index0"), "/dev/v4l/by-id/usb-Logitech_C920-video-index0" ); } #[test] fn normalize_camera_selection_drops_auto_and_blank_values() { assert_eq!(normalize_camera_selection(None), None); assert_eq!(normalize_camera_selection(Some("")), None); assert_eq!(normalize_camera_selection(Some("auto")), None); assert_eq!( normalize_camera_selection(Some("usb-Logitech_C920-video-index0")), Some("usb-Logitech_C920-video-index0".to_string()) ); } #[test] fn camera_preview_pipeline_scales_after_source_instead_of_pinning_raw_source_caps() { let desc = camera_preview_pipeline_desc("/dev/video0"); assert!(desc.contains("v4l2src device=\"/dev/video0\"")); assert!(desc.contains("videoconvert ! videoscale ! videorate !")); assert!(!desc.contains("v4l2src device=\"/dev/video0\" do-timestamp=true ! video/x-raw,")); } #[test] fn push_recent_audio_keeps_only_last_three_seconds() { let buffer = Arc::new(Mutex::new(Vec::new())); push_recent_audio(&buffer, &vec![1u8; MIC_REPLAY_MAX_BYTES / 2]); push_recent_audio(&buffer, &vec![2u8; MIC_REPLAY_MAX_BYTES]); let stored = buffer.lock().expect("buffer").clone(); assert_eq!(stored.len(), MIC_REPLAY_MAX_BYTES); assert!(stored.iter().any(|byte| *byte == 2)); } #[test] fn build_wav_bytes_writes_a_valid_riff_header() { let audio = vec![0u8; 32]; let wav = build_wav_bytes(&audio, 16_000, 1, 16); assert!(wav.starts_with(b"RIFF")); assert_eq!(&wav[8..12], b"WAVE"); assert_eq!(&wav[36..40], b"data"); assert_eq!(wav.len(), 44 + audio.len()); } }