Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 31 additions & 65 deletions crates/coldvox-stt/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ use tracing::{debug, error, info, warn};
pub enum UtteranceState {
/// No speech detected
Idle,
/// Speech is active, buffering audio
/// Speech is active, streaming audio
SpeechActive {
/// Timestamp when speech started
started_at: Instant,
/// Buffered audio frames for this utterance
audio_buffer: Vec<i16>,
/// Number of frames buffered
frames_buffered: u64,
/// Number of frames processed
frames_processed: u64,
},
}

Expand Down Expand Up @@ -127,7 +125,7 @@ impl<T: StreamingStt + Send> SttProcessor<T> {

// Listen for audio frames
Ok(frame) = self.audio_rx.recv() => {
self.handle_audio_frame(frame);
self.handle_audio_frame(frame).await;
}

else => {
Expand Down Expand Up @@ -160,10 +158,7 @@ impl<T: StreamingStt + Send> SttProcessor<T> {

self.state = UtteranceState::SpeechActive {
started_at: start_instant,
audio_buffer: Vec::with_capacity(
SAMPLE_RATE_HZ as usize * DEFAULT_BUFFER_DURATION_SECONDS,
),
frames_buffered: 0,
frames_processed: 0,
};

// Reset STT engine for new utterance
Expand All @@ -173,87 +168,58 @@ impl<T: StreamingStt + Send> SttProcessor<T> {
}

/// Handle speech end event
async fn handle_speech_end(&mut self, _timestamp_ms: u64, _duration_ms: Option<u64>) {
debug!(target: "stt", "Starting handle_speech_end()");

// Process the buffered audio all at once
async fn handle_speech_end(&mut self, _timestamp_ms: u64, duration_ms: Option<u64>) {
if let UtteranceState::SpeechActive {
audio_buffer,
frames_buffered,
..
} = &self.state
frames_processed, ..
} = self.state
{
let buffer_size = audio_buffer.len();
info!(
target: "stt",
"Processing buffered audio: {} samples ({:.2}s), {} frames",
buffer_size,
buffer_size as f32 / SAMPLE_RATE_HZ as f32,
frames_buffered
"Speech ended after {}ms, processed {} frames. Finalizing transcription.",
duration_ms.unwrap_or(0),
frames_processed
);

if !audio_buffer.is_empty() {
// Send the entire buffer to the STT engine
// Stream model expects per-frame feeding; here we feed the whole buffered audio
// in chunks to preserve event semantics.
for chunk in audio_buffer.chunks(DEFAULT_CHUNK_SIZE_SAMPLES) {
// Process in 1-second chunks
if let Some(event) = self.stt_engine.on_speech_frame(chunk).await {
self.send_event(event).await;
}
}
debug!(target: "stt", "Finished streaming frames to STT engine");
let mut metrics = self.metrics.write();
metrics.frames_out += frames_buffered;
metrics.last_event_time = Some(Instant::now());
}

// Finalize to get any remaining transcription
let result = self.stt_engine.on_speech_end().await;
match result {
Some(event) => {
debug!(target: "stt", "STT engine returned Final event: {:?}", event);
self.send_event(event).await;
let mut metrics = self.metrics.write();
metrics.final_count += 1;
metrics.last_event_time = Some(Instant::now());
}
None => {
debug!(target: "stt", "STT engine returned None on speech end");
}
if let Some(event) = self.stt_engine.on_speech_end().await {
self.send_event(event).await;
}
}

self.state = UtteranceState::Idle;
}

/// Handle incoming audio frame
fn handle_audio_frame(&mut self, frame: AudioFrame) {
/// Handle incoming audio frame by streaming it to the STT engine
async fn handle_audio_frame(&mut self, frame: AudioFrame) {
// Update metrics
self.metrics.write().frames_in += 1;

// Only buffer if speech is active
let mut event_to_send = None;
// Only process if speech is active
if let UtteranceState::SpeechActive {
ref mut audio_buffer,
ref mut frames_buffered,
ref mut frames_processed,
..
} = &mut self.state
{
// Buffer the audio frame (already i16 PCM)
audio_buffer.extend_from_slice(&frame.data);
*frames_buffered += 1;
// Stream the audio frame to the STT engine
event_to_send = self.stt_engine.on_speech_frame(&frame.data).await;

*frames_processed += 1;
self.metrics.write().frames_out += 1;

// Log periodically to show we're buffering
if *frames_buffered % LOGGING_INTERVAL_FRAMES == 0 {
// Log periodically to show we're processing
if *frames_processed % LOGGING_INTERVAL_FRAMES == 0 {
debug!(
target: "stt",
"Buffering audio: {} frames, {} samples ({:.2}s)",
frames_buffered,
audio_buffer.len(),
audio_buffer.len() as f32 / SAMPLE_RATE_HZ as f32
"Processed {} audio frames for current utterance",
frames_processed,
);
}
}

if let Some(event) = event_to_send {
self.send_event(event).await;
}
}

/// Send transcription event
Expand Down
Loading