55//! to work with any VAD system and any STT implementation.
66
77use crate :: constants:: * ;
8- use crate :: types:: { SttMetrics , TranscriptionConfig , TranscriptionEvent } ;
8+ use crate :: types:: { TranscriptionConfig , TranscriptionEvent } ;
99use crate :: StreamingStt ;
10+ use coldvox_telemetry:: SttPerformanceMetrics ;
1011/// Minimal audio frame type (i16 PCM) used by the generic STT processor
1112#[ derive( Debug , Clone ) ]
1213pub struct AudioFrame {
@@ -21,10 +22,9 @@ pub enum VadEvent {
2122 SpeechStart { timestamp_ms : u64 } ,
2223 SpeechEnd { timestamp_ms : u64 , duration_ms : u64 } ,
2324}
24- use std:: sync:: Arc ;
2525use std:: time:: Instant ;
2626use tokio:: sync:: { broadcast, mpsc} ;
27- use tracing:: { debug, error , info, warn } ;
27+ use tracing:: { debug, info} ;
2828
2929/// STT processor state
3030#[ derive( Debug , Clone ) ]
@@ -55,7 +55,7 @@ pub struct SttProcessor<T: StreamingStt> {
5555 /// Current utterance state
5656 state : UtteranceState ,
5757 /// Metrics
58- metrics : Arc < parking_lot :: RwLock < SttMetrics > > ,
58+ metrics : SttPerformanceMetrics ,
5959 /// Configuration
6060 config : TranscriptionConfig ,
6161}
@@ -80,14 +80,14 @@ impl<T: StreamingStt + Send> SttProcessor<T> {
8080 event_tx,
8181 stt_engine,
8282 state : UtteranceState :: Idle ,
83- metrics : Arc :: new ( parking_lot :: RwLock :: new ( SttMetrics :: default ( ) ) ) ,
83+ metrics : SttPerformanceMetrics :: new ( ) ,
8484 config,
8585 }
8686 }
8787
8888 /// Get current metrics
89- pub fn metrics ( & self ) -> SttMetrics {
90- self . metrics . read ( ) . clone ( )
89+ pub fn metrics ( & self ) -> SttPerformanceMetrics {
90+ self . metrics . clone ( )
9191 }
9292
9393 /// Run the STT processor loop
@@ -113,6 +113,7 @@ impl<T: StreamingStt + Send> SttProcessor<T> {
113113 tokio:: select! {
114114 // Listen for VAD events
115115 Some ( event) = self . vad_event_rx. recv( ) => {
116+ self . metrics. increment_requests( ) ;
116117 match event {
117118 VadEvent :: SpeechStart { timestamp_ms } => {
118119 debug!( target: "stt" , "Received SpeechStart event @ {}ms" , timestamp_ms) ;
@@ -127,6 +128,7 @@ impl<T: StreamingStt + Send> SttProcessor<T> {
127128
128129 // Listen for audio frames
129130 Ok ( frame) = self . audio_rx. recv( ) => {
131+ self . metrics. increment_requests( ) ;
130132 self . handle_audio_frame( frame) ;
131133 }
132134
@@ -136,18 +138,15 @@ impl<T: StreamingStt + Send> SttProcessor<T> {
136138 }
137139 }
138140 }
139-
140141 // Log final metrics
141- let metrics = self . metrics . read ( ) ;
142+ let ( _ , accuracy , _ , operational ) = self . metrics . snapshot ( ) ;
142143 info ! (
143144 target: "stt" ,
144- "STT processor final stats - frames in: {}, out: {}, dropped: {}, partials: {}, finals: {}, errors: {}" ,
145- metrics. frames_in,
146- metrics. frames_out,
147- metrics. frames_dropped,
148- metrics. partial_count,
149- metrics. final_count,
150- metrics. error_count
145+ "STT processor final stats - requests: {}, partials: {}, finals: {}, errors: {}" ,
146+ operational. request_count,
147+ accuracy. partial_count,
148+ accuracy. final_count,
149+ operational. error_count
151150 ) ;
152151 }
153152
@@ -175,50 +174,27 @@ impl<T: StreamingStt + Send> SttProcessor<T> {
175174 /// Handle speech end event
176175 async fn handle_speech_end ( & mut self , _timestamp_ms : u64 , _duration_ms : Option < u64 > ) {
177176 debug ! ( target: "stt" , "Starting handle_speech_end()" ) ;
177+ let _guard = coldvox_telemetry:: TimingGuard :: new (
178+ & self . metrics ,
179+ |m, d| m. record_end_to_end_latency ( d)
180+ ) ;
178181
179- // Process the buffered audio all at once
180- if let UtteranceState :: SpeechActive {
181- audio_buffer,
182- frames_buffered,
183- ..
184- } = & self . state
185- {
186- let buffer_size = audio_buffer. len ( ) ;
187- info ! (
188- target: "stt" ,
189- "Processing buffered audio: {} samples ({:.2}s), {} frames" ,
190- buffer_size,
191- buffer_size as f32 / SAMPLE_RATE_HZ as f32 ,
192- frames_buffered
193- ) ;
194-
182+ if let UtteranceState :: SpeechActive { audio_buffer, .. } = & self . state {
195183 if !audio_buffer. is_empty ( ) {
196- // Send the entire buffer to the STT engine
197- // Stream model expects per-frame feeding; here we feed the whole buffered audio
198- // in chunks to preserve event semantics.
199184 for chunk in audio_buffer. chunks ( DEFAULT_CHUNK_SIZE_SAMPLES ) {
200- // Process in 1-second chunks
201185 if let Some ( event) = self . stt_engine . on_speech_frame ( chunk) . await {
202186 self . send_event ( event) . await ;
203187 }
204188 }
205- debug ! ( target: "stt" , "Finished streaming frames to STT engine" ) ;
206- let mut metrics = self . metrics . write ( ) ;
207- metrics. frames_out += frames_buffered;
208- metrics. last_event_time = Some ( Instant :: now ( ) ) ;
209189 }
210190
211- // Finalize to get any remaining transcription
212- let result = self . stt_engine . on_speech_end ( ) . await ;
213- match result {
191+ match self . stt_engine . on_speech_end ( ) . await {
214192 Some ( event) => {
215- debug ! ( target : "stt" , "STT engine returned Final event: {:?}" , event ) ;
193+ self . metrics . record_transcription_success ( ) ;
216194 self . send_event ( event) . await ;
217- let mut metrics = self . metrics . write ( ) ;
218- metrics. final_count += 1 ;
219- metrics. last_event_time = Some ( Instant :: now ( ) ) ;
220195 }
221196 None => {
197+ self . metrics . record_transcription_failure ( ) ;
222198 debug ! ( target: "stt" , "STT engine returned None on speech end" ) ;
223199 }
224200 }
@@ -229,72 +205,30 @@ impl<T: StreamingStt + Send> SttProcessor<T> {
229205
230206 /// Handle incoming audio frame
231207 fn handle_audio_frame ( & mut self , frame : AudioFrame ) {
232- // Update metrics
233- self . metrics . write ( ) . frames_in += 1 ;
234-
235- // Only buffer if speech is active
236- if let UtteranceState :: SpeechActive {
237- ref mut audio_buffer,
238- ref mut frames_buffered,
239- ..
240- } = & mut self . state
241- {
242- // Buffer the audio frame (already i16 PCM)
208+ if let UtteranceState :: SpeechActive { audio_buffer, .. } = & mut self . state {
243209 audio_buffer. extend_from_slice ( & frame. data ) ;
244- * frames_buffered += 1 ;
245-
246- // Log periodically to show we're buffering
247- if * frames_buffered % LOGGING_INTERVAL_FRAMES == 0 {
248- debug ! (
249- target: "stt" ,
250- "Buffering audio: {} frames, {} samples ({:.2}s)" ,
251- frames_buffered,
252- audio_buffer. len( ) ,
253- audio_buffer. len( ) as f32 / SAMPLE_RATE_HZ as f32
254- ) ;
255- }
210+ let utilization = ( audio_buffer. len ( ) * 100 ) / audio_buffer. capacity ( ) ;
211+ self . metrics . update_buffer_utilization ( utilization as u64 ) ;
256212 }
257213 }
258214
259215 /// Send transcription event
260216 async fn send_event ( & self , event : TranscriptionEvent ) {
261- // Log the event
262217 match & event {
263- TranscriptionEvent :: Partial { text, .. } => {
264- info ! ( target: "stt" , "Partial: {}" , text) ;
265- self . metrics . write ( ) . partial_count += 1 ;
266- }
267- TranscriptionEvent :: Final { text, words, .. } => {
268- let word_count = words. as_ref ( ) . map ( |w| w. len ( ) ) . unwrap_or ( 0 ) ;
269- info ! ( target: "stt" , "Final: {} (words: {})" , text, word_count) ;
270- self . metrics . write ( ) . final_count += 1 ;
271- }
272- TranscriptionEvent :: Error { code, message } => {
273- error ! ( target: "stt" , "Error [{}]: {}" , code, message) ;
274- self . metrics . write ( ) . error_count += 1 ;
275- }
218+ TranscriptionEvent :: Partial { .. } => self . metrics . record_partial_transcription ( ) ,
219+ TranscriptionEvent :: Final { .. } => self . metrics . record_final_transcription ( ) ,
220+ TranscriptionEvent :: Error { .. } => self . metrics . record_error ( ) ,
276221 }
277222
278- // Send to channel with backpressure - wait if channel is full
279- // Use timeout to prevent indefinite blocking
280- match tokio:: time:: timeout (
281- std:: time:: Duration :: from_secs ( SEND_TIMEOUT_SECONDS ) ,
223+ if tokio:: time:: timeout (
224+ std:: time:: Duration :: from_secs ( 5 ) ,
282225 self . event_tx . send ( event) ,
283226 )
284227 . await
228+ . is_err ( )
285229 {
286- Ok ( Ok ( ( ) ) ) => {
287- // Successfully sent
288- }
289- Ok ( Err ( _) ) => {
290- // Channel closed
291- debug ! ( target: "stt" , "Event channel closed" ) ;
292- }
293- Err ( _) => {
294- // Timeout - consumer is too slow
295- warn ! ( target: "stt" , "Event channel send timed out after 5s - consumer too slow" ) ;
296- self . metrics . write ( ) . frames_dropped += 1 ;
297- }
230+ self . metrics . record_error ( ) ;
231+ debug ! ( target: "stt" , "Event channel closed or send timed out" ) ;
298232 }
299233 }
300234}
0 commit comments