use super::video_packets::{ ProbeFrameKind, VideoPacketSource, build_video_packet_source, probe_frame_kind, stop_video_packet_source, video_packet_data, }; use super::*; #[cfg(test)] pub(super) fn rebase_probe_packet_pts( pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser, source_pts_us: u64, lag_cap: Duration, ) -> u64 { pts_rebaser .rebase_with_packet_duration(Some(source_pts_us), 1, lag_cap) .packet_pts_us } pub struct SyncProbeCapture { running: Arc, probe_start: Instant, start_unix_ns: u64, video_queue: FreshPacketQueue, audio_queue: FreshPacketQueue, video_thread: Option>, audio_thread: Option>, } impl SyncProbeCapture { pub fn new(camera: CameraConfig, schedule: PulseSchedule, duration: Duration) -> Result { gst::init().context("gst init")?; let running = Arc::new(AtomicBool::new(true)); let video_queue = FreshPacketQueue::new(PROBE_VIDEO_QUEUE); let audio_queue = FreshPacketQueue::new(PROBE_AUDIO_QUEUE); let packet_source = build_video_packet_source(camera, &schedule)?; let start_unix_ns = super::unix_now_ns(); let probe_start = Instant::now(); let video_thread = spawn_video_thread(VideoThreadConfig { packet_source, camera, schedule: schedule.clone(), duration, probe_start, running: running.clone(), queue: video_queue.clone(), }); let audio_thread = spawn_audio_thread( schedule, duration, probe_start, running.clone(), audio_queue.clone(), ); Ok(Self { running, probe_start, start_unix_ns, video_queue, audio_queue, video_thread: Some(video_thread), audio_thread: Some(audio_thread), }) } pub fn video_queue(&self) -> FreshPacketQueue { self.video_queue.clone() } pub fn audio_queue(&self) -> FreshPacketQueue { self.audio_queue.clone() } pub fn start_unix_ns(&self) -> u64 { self.start_unix_ns } /// Return the local monotonic instant associated with synthetic PTS zero. /// /// Inputs: none. /// Outputs: the `Instant` used to pace the probe's capture threads. /// Why: the transport sender needs send-age telemetry on the same clock as /// synthetic capture PTS, not the global physical-capture clock. pub fn probe_start(&self) -> Instant { self.probe_start } } impl Drop for SyncProbeCapture { fn drop(&mut self) { self.running.store(false, Ordering::Release); self.video_queue.close(); self.audio_queue.close(); if let Some(handle) = self.video_thread.take() { let _ = handle.join(); } if let Some(handle) = self.audio_thread.take() { let _ = handle.join(); } } } struct VideoThreadConfig { packet_source: VideoPacketSource, camera: CameraConfig, schedule: PulseSchedule, duration: Duration, probe_start: Instant, running: Arc, queue: FreshPacketQueue, } /// Raw RGB signatures needed only by live encoder probe paths. /// /// Inputs: camera dimensions and event-code schedule. /// Outputs: reusable frame buffers for H.264 encoding. /// Why: MJPEG probes already carry pre-encoded signatures, so building these /// large raw frames for MJPEG would delay the synthetic capture clock. struct RawProbeFrames { dark_frame: Vec, regular_pulse_frame: Vec, marker_pulse_frame: Vec, coded_pulse_frames: BTreeMap>, } impl RawProbeFrames { /// Build raw frames only when the packet source still needs them. /// /// Inputs: packet source, camera profile, and pulse schedule. /// Outputs: raw RGB frames for live encoders or `None` for pre-encoded MJPEG. /// Why: client-origin freshness must not include one-time test-pattern /// setup work that would never happen inside physical camera capture. fn maybe_new( packet_source: &VideoPacketSource, camera: CameraConfig, schedule: &PulseSchedule, ) -> Option { if !matches!(packet_source, VideoPacketSource::Pipeline { .. }) { return None; } Some(Self { dark_frame: build_dark_probe_frame(camera.width as usize, camera.height as usize), regular_pulse_frame: build_regular_probe_frame( camera.width as usize, camera.height as usize, ), marker_pulse_frame: build_marker_probe_frame( camera.width as usize, camera.height as usize, ), coded_pulse_frames: schedule .event_width_codes() .iter() .copied() .map(|code| { ( code, build_coded_probe_frame( camera.width as usize, camera.height as usize, code, ), ) }) .collect::>(), }) } /// Return the raw RGB frame for one probe frame kind. /// /// Inputs: selected frame kind from the shared pulse schedule. /// Outputs: borrowed raw frame bytes. /// Why: H.264 encoding still needs the same visual signatures as MJPEG so /// analyzer identity stays comparable across codecs. fn frame(&self, frame_kind: ProbeFrameKind) -> &[u8] { match frame_kind { ProbeFrameKind::Dark => &self.dark_frame, ProbeFrameKind::RegularPulse => &self.regular_pulse_frame, ProbeFrameKind::MarkerPulse => &self.marker_pulse_frame, ProbeFrameKind::Coded(code) => self .coded_pulse_frames .get(&code) .unwrap_or(&self.regular_pulse_frame), } } } fn spawn_video_thread(config: VideoThreadConfig) -> JoinHandle<()> { let VideoThreadConfig { mut packet_source, camera, schedule, duration, probe_start, running, queue, } = config; thread::spawn(move || { let raw_frames = RawProbeFrames::maybe_new(&packet_source, camera, &schedule); let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1))); let mut frame_index = 0u64; while running.load(Ordering::Acquire) { let pts = schedule.frame_pts(frame_index, camera.fps.max(1)); if pts > duration { break; } let deadline = probe_start + pts; if let Some(remaining) = deadline.checked_duration_since(Instant::now()) && !remaining.is_zero() { thread::sleep(remaining); } let frame_kind = probe_frame_kind(&schedule, pts); let raw_frame = raw_frames .as_ref() .map(|frames| frames.frame(frame_kind)) .unwrap_or(&[]); if let Some(encoded) = video_packet_data(&mut packet_source, frame_kind, raw_frame, pts, frame_step) { let source_pts_us = encoded.pts.as_micros().min(u64::MAX as u128) as u64; let packet = VideoPacket { id: 2, pts: source_pts_us, data: encoded.data, ..Default::default() }; let _ = queue.push(packet, Duration::ZERO); } frame_index = frame_index.saturating_add(1); } stop_video_packet_source(packet_source); queue.close(); }) } fn spawn_audio_thread( schedule: PulseSchedule, duration: Duration, probe_start: Instant, running: Arc, queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = (AUDIO_SAMPLE_RATE as usize * AUDIO_CHUNK_MS as usize / 1_000).max(1); let mut chunk_index = 0u64; while running.load(Ordering::Acquire) { let pts = chunk_duration.saturating_mul(chunk_index as u32); if pts > duration { break; } let deadline = probe_start + pts; if let Some(remaining) = deadline.checked_duration_since(Instant::now()) && !remaining.is_zero() { thread::sleep(remaining); } let chunk = render_audio_chunk(&schedule, pts, samples_per_chunk); let source_pts_us = pts.as_micros().min(u64::MAX as u128) as u64; let packet = AudioPacket { id: 0, pts: source_pts_us, data: chunk, ..Default::default() }; let _ = queue.push(packet, Duration::ZERO); chunk_index = chunk_index.saturating_add(1); } queue.close(); }) }