11// Copyright (c) Microsoft Corporation.
22// Licensed under the MIT License.
33
4- use diag_client:: kmsg_stream:: KmsgStream ;
54use fs_err:: File ;
65use fs_err:: PathExt ;
76use futures:: AsyncBufReadExt ;
@@ -305,24 +304,41 @@ impl<'a> MakeWriter<'a> for PetriWriter {
305304}
306305
307306/// Logs lines from `reader` into `log_file`.
308- pub async fn log_stream (
307+ ///
308+ /// Attempts to parse lines as `SyslogParsedEntry`, extracting the log level.
309+ /// Passes through any non-conforming logs.
310+ pub async fn log_task (
309311 log_file : PetriLogFile ,
310312 reader : impl AsyncRead + Unpin + Send + ' static ,
313+ name : & str ,
311314) -> anyhow:: Result < ( ) > {
315+ tracing:: info!( "connected to {name}" ) ;
312316 let mut buf = Vec :: new ( ) ;
313317 let mut reader = BufReader :: new ( reader) ;
314318 loop {
315319 buf. clear ( ) ;
316- let n = ( & mut reader) . take ( 256 ) . read_until ( b'\n' , & mut buf) . await ?;
317- if n == 0 {
318- break ;
320+ match ( & mut reader) . take ( 256 ) . read_until ( b'\n' , & mut buf) . await {
321+ Ok ( 0 ) => {
322+ tracing:: info!( "disconnected from {name}: EOF" ) ;
323+ return Ok ( ( ) ) ;
324+ }
325+ Err ( e) => {
326+ tracing:: info!( "disconnected from {name}: error: {e:#}" ) ;
327+ return Err ( e. into ( ) ) ;
328+ }
329+ _ => { }
319330 }
320331
321332 let string_buf = String :: from_utf8_lossy ( & buf) ;
322333 let string_buf_trimmed = string_buf. trim_end ( ) ;
323- log_file. write_entry ( string_buf_trimmed) ;
334+
335+ if let Some ( message) = kmsg:: SyslogParsedEntry :: new ( string_buf_trimmed) {
336+ let level = kernel_level_to_tracing_level ( message. level ) ;
337+ log_file. write_entry_fmt ( None , level, format_args ! ( "{}" , message. display( false ) ) ) ;
338+ } else {
339+ log_file. write_entry ( string_buf_trimmed) ;
340+ }
324341 }
325- Ok ( ( ) )
326342}
327343
328344/// Maps kernel log levels to tracing levels.
@@ -339,23 +355,30 @@ fn kernel_level_to_tracing_level(kernel_level: u8) -> Level {
339355/// read from the kmsg stream and write entries to the log
340356pub async fn kmsg_log_task (
341357 log_file : PetriLogFile ,
342- mut file_stream : KmsgStream ,
358+ diag_client : diag_client :: DiagClient ,
343359) -> anyhow:: Result < ( ) > {
344- while let Some ( data) = file_stream. next ( ) . await {
345- match data {
346- Ok ( data) => {
347- let message = KmsgParsedEntry :: new ( & data) . unwrap ( ) ;
348- let level = kernel_level_to_tracing_level ( message. level ) ;
349- log_file. write_entry_fmt ( None , level, format_args ! ( "{}" , message. display( false ) ) ) ;
350- }
351- Err ( err) => {
352- tracing:: info!( "kmsg disconnected: {err:?}" ) ;
353- break ;
360+ loop {
361+ diag_client. wait_for_server ( ) . await ?;
362+ let mut kmsg = diag_client. kmsg ( true ) . await ?;
363+ tracing:: info!( "kmsg connected" ) ;
364+ while let Some ( data) = kmsg. next ( ) . await {
365+ match data {
366+ Ok ( data) => {
367+ let message = KmsgParsedEntry :: new ( & data) . unwrap ( ) ;
368+ let level = kernel_level_to_tracing_level ( message. level ) ;
369+ log_file. write_entry_fmt (
370+ None ,
371+ level,
372+ format_args ! ( "{}" , message. display( false ) ) ,
373+ ) ;
374+ }
375+ Err ( err) => {
376+ tracing:: info!( "kmsg disconnected: {err:#}" ) ;
377+ break ;
378+ }
354379 }
355380 }
356381 }
357-
358- Ok ( ( ) )
359382}
360383
361384#[ cfg( test) ]
0 commit comments