285 lines
9.3 KiB
Rust

use super::video_packets::{
ProbeFrameKind, VideoPacketSource, build_video_packet_source, probe_frame_kind,
stop_video_packet_source, video_packet_data,
};
use super::*;
#[cfg(test)]
pub(super) fn rebase_probe_packet_pts(
pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser,
source_pts_us: u64,
lag_cap: Duration,
) -> u64 {
pts_rebaser
.rebase_with_packet_duration(Some(source_pts_us), 1, lag_cap)
.packet_pts_us
}
pub struct SyncProbeCapture {
running: Arc<AtomicBool>,
probe_start: Instant,
start_unix_ns: u64,
video_queue: FreshPacketQueue<VideoPacket>,
audio_queue: FreshPacketQueue<AudioPacket>,
video_thread: Option<JoinHandle<()>>,
audio_thread: Option<JoinHandle<()>>,
}
impl SyncProbeCapture {
pub fn new(camera: CameraConfig, schedule: PulseSchedule, duration: Duration) -> Result<Self> {
gst::init().context("gst init")?;
let running = Arc::new(AtomicBool::new(true));
let video_queue = FreshPacketQueue::new(PROBE_VIDEO_QUEUE);
let audio_queue = FreshPacketQueue::new(PROBE_AUDIO_QUEUE);
let packet_source = build_video_packet_source(camera, &schedule)?;
let start_unix_ns = super::unix_now_ns();
let probe_start = Instant::now();
let video_thread = spawn_video_thread(VideoThreadConfig {
packet_source,
camera,
schedule: schedule.clone(),
duration,
probe_start,
running: running.clone(),
queue: video_queue.clone(),
});
let audio_thread = spawn_audio_thread(
schedule,
duration,
probe_start,
running.clone(),
audio_queue.clone(),
);
Ok(Self {
running,
probe_start,
start_unix_ns,
video_queue,
audio_queue,
video_thread: Some(video_thread),
audio_thread: Some(audio_thread),
})
}
pub fn video_queue(&self) -> FreshPacketQueue<VideoPacket> {
self.video_queue.clone()
}
pub fn audio_queue(&self) -> FreshPacketQueue<AudioPacket> {
self.audio_queue.clone()
}
pub fn start_unix_ns(&self) -> u64 {
self.start_unix_ns
}
/// Return the local monotonic instant associated with synthetic PTS zero.
///
/// Inputs: none.
/// Outputs: the `Instant` used to pace the probe's capture threads.
/// Why: the transport sender needs send-age telemetry on the same clock as
/// synthetic capture PTS, not the global physical-capture clock.
pub fn probe_start(&self) -> Instant {
self.probe_start
}
}
impl Drop for SyncProbeCapture {
fn drop(&mut self) {
self.running.store(false, Ordering::Release);
self.video_queue.close();
self.audio_queue.close();
if let Some(handle) = self.video_thread.take() {
let _ = handle.join();
}
if let Some(handle) = self.audio_thread.take() {
let _ = handle.join();
}
}
}
struct VideoThreadConfig {
packet_source: VideoPacketSource,
camera: CameraConfig,
schedule: PulseSchedule,
duration: Duration,
probe_start: Instant,
running: Arc<AtomicBool>,
queue: FreshPacketQueue<VideoPacket>,
}
/// Raw RGB signatures needed only by live encoder probe paths.
///
/// Inputs: camera dimensions and event-code schedule.
/// Outputs: reusable frame buffers for H.264 encoding.
/// Why: MJPEG probes already carry pre-encoded signatures, so building these
/// large raw frames for MJPEG would delay the synthetic capture clock.
struct RawProbeFrames {
dark_frame: Vec<u8>,
regular_pulse_frame: Vec<u8>,
marker_pulse_frame: Vec<u8>,
coded_pulse_frames: BTreeMap<u32, Vec<u8>>,
}
impl RawProbeFrames {
/// Build raw frames only when the packet source still needs them.
///
/// Inputs: packet source, camera profile, and pulse schedule.
/// Outputs: raw RGB frames for live encoders or `None` for pre-encoded MJPEG.
/// Why: client-origin freshness must not include one-time test-pattern
/// setup work that would never happen inside physical camera capture.
fn maybe_new(
packet_source: &VideoPacketSource,
camera: CameraConfig,
schedule: &PulseSchedule,
) -> Option<Self> {
if !matches!(packet_source, VideoPacketSource::Pipeline { .. }) {
return None;
}
Some(Self {
dark_frame: build_dark_probe_frame(camera.width as usize, camera.height as usize),
regular_pulse_frame: build_regular_probe_frame(
camera.width as usize,
camera.height as usize,
),
marker_pulse_frame: build_marker_probe_frame(
camera.width as usize,
camera.height as usize,
),
coded_pulse_frames: schedule
.event_width_codes()
.iter()
.copied()
.map(|code| {
(
code,
build_coded_probe_frame(
camera.width as usize,
camera.height as usize,
code,
),
)
})
.collect::<BTreeMap<_, _>>(),
})
}
/// Return the raw RGB frame for one probe frame kind.
///
/// Inputs: selected frame kind from the shared pulse schedule.
/// Outputs: borrowed raw frame bytes.
/// Why: H.264 encoding still needs the same visual signatures as MJPEG so
/// analyzer identity stays comparable across codecs.
fn frame(&self, frame_kind: ProbeFrameKind) -> &[u8] {
match frame_kind {
ProbeFrameKind::Dark => &self.dark_frame,
ProbeFrameKind::RegularPulse => &self.regular_pulse_frame,
ProbeFrameKind::MarkerPulse => &self.marker_pulse_frame,
ProbeFrameKind::Coded(code) => self
.coded_pulse_frames
.get(&code)
.unwrap_or(&self.regular_pulse_frame),
}
}
}
fn spawn_video_thread(config: VideoThreadConfig) -> JoinHandle<()> {
let VideoThreadConfig {
mut packet_source,
camera,
schedule,
duration,
probe_start,
running,
queue,
} = config;
thread::spawn(move || {
let raw_frames = RawProbeFrames::maybe_new(&packet_source, camera, &schedule);
let frame_step = Duration::from_nanos(1_000_000_000u64 / u64::from(camera.fps.max(1)));
let mut frame_index = 0u64;
while running.load(Ordering::Acquire) {
let pts = schedule.frame_pts(frame_index, camera.fps.max(1));
if pts > duration {
break;
}
let deadline = probe_start + pts;
if let Some(remaining) = deadline.checked_duration_since(Instant::now())
&& !remaining.is_zero()
{
thread::sleep(remaining);
}
let frame_kind = probe_frame_kind(&schedule, pts);
let raw_frame = raw_frames
.as_ref()
.map(|frames| frames.frame(frame_kind))
.unwrap_or(&[]);
if let Some(encoded) =
video_packet_data(&mut packet_source, frame_kind, raw_frame, pts, frame_step)
{
let source_pts_us = encoded.pts.as_micros().min(u64::MAX as u128) as u64;
let packet = VideoPacket {
id: 2,
pts: source_pts_us,
data: encoded.data,
..Default::default()
};
let _ = queue.push(packet, Duration::ZERO);
}
frame_index = frame_index.saturating_add(1);
}
stop_video_packet_source(packet_source);
queue.close();
})
}
fn spawn_audio_thread(
schedule: PulseSchedule,
duration: Duration,
probe_start: Instant,
running: Arc<AtomicBool>,
queue: FreshPacketQueue<AudioPacket>,
) -> JoinHandle<()> {
thread::spawn(move || {
let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS);
let samples_per_chunk =
(AUDIO_SAMPLE_RATE as usize * AUDIO_CHUNK_MS as usize / 1_000).max(1);
let mut chunk_index = 0u64;
while running.load(Ordering::Acquire) {
let pts = chunk_duration.saturating_mul(chunk_index as u32);
if pts > duration {
break;
}
let deadline = probe_start + pts;
if let Some(remaining) = deadline.checked_duration_since(Instant::now())
&& !remaining.is_zero()
{
thread::sleep(remaining);
}
let chunk = render_audio_chunk(&schedule, pts, samples_per_chunk);
let source_pts_us = pts.as_micros().min(u64::MAX as u128) as u64;
let packet = AudioPacket {
id: 0,
pts: source_pts_us,
frame_duration_us: AUDIO_CHUNK_MS.saturating_mul(1_000) as u32,
data: chunk,
..Default::default()
};
let _ = queue.push(packet, Duration::ZERO);
chunk_index = chunk_index.saturating_add(1);
}
queue.close();
})
}