-
Notifications
You must be signed in to change notification settings - Fork 5
fail to send logging #1255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fail to send logging #1255
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,7 +74,7 @@ async fn process_incoming_message( | |
| Message::Text(text) => { | ||
| // Handle heartbeat ping messages | ||
| if text == "ping" { | ||
| ws_send(message_sender, FromWebSocketMessage::Pong); | ||
| ws_send(message_sender, FromWebSocketMessage::Pong)?; | ||
| return Ok(None); | ||
| } | ||
|
|
||
|
|
@@ -192,36 +192,39 @@ async fn handle_websocket_connection( | |
| if let Err(err) = | ||
| ensure_user_attachment_access(&ctx, user_context.clone(), attachments).await | ||
| { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)); | ||
| if let Err(send_err) = | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)) | ||
| { | ||
| tracing::error!(error=?send_err, "failed to send attachment access error"); | ||
| } | ||
| continue; | ||
| } | ||
|
|
||
| match incoming_message { | ||
| ToWebSocketMessage::SendChatMessage(payload) => { | ||
| if let Err(e) = check_user_quota(&ctx, &user_context).await { | ||
| match e { | ||
| let send_result = match e { | ||
| UserQuotaError::InvalidMacroUserId | ||
| | UserQuotaError::UnableToGetUserPermissions | ||
| | UserQuotaError::UnableToGetUserQuota => { | ||
| ws_send( | ||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::InternalError { | ||
| stream_id: payload.stream_id, | ||
| }, | ||
| )), | ||
| ); | ||
| } | ||
| UserQuotaError::ExceededMaxChatMessages => { | ||
| ws_send( | ||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::PaymentRequired { | ||
| stream_id: payload.stream_id, | ||
| }, | ||
| )), | ||
| ); | ||
| } | ||
| | UserQuotaError::UnableToGetUserQuota => ws_send( | ||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::InternalError { | ||
| stream_id: payload.stream_id, | ||
| }, | ||
| )), | ||
| ), | ||
| UserQuotaError::ExceededMaxChatMessages => ws_send( | ||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::PaymentRequired { | ||
| stream_id: payload.stream_id, | ||
| }, | ||
| )), | ||
| ), | ||
| }; | ||
| if let Err(send_err) = send_result { | ||
|
||
| tracing::error!(error=?send_err, "failed to send user quota error"); | ||
| } | ||
| continue; | ||
| } | ||
|
|
@@ -230,14 +233,16 @@ async fn handle_websocket_connection( | |
| { | ||
| Ok(id) => Arc::new(id), | ||
| Err(_) => { | ||
| ws_send( | ||
| if let Err(send_err) = ws_send( | ||
|
||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::InternalError { | ||
| stream_id: payload.stream_id.clone(), | ||
| }, | ||
| )), | ||
| ); | ||
| ) { | ||
| tracing::error!(error=?send_err, "failed to send user id error"); | ||
| } | ||
| continue; | ||
| } | ||
| }; | ||
|
|
@@ -254,19 +259,25 @@ async fn handle_websocket_connection( | |
| .await | ||
| { | ||
| Err(e) => { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(e)); | ||
| if let Err(send_err) = | ||
|
||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(e)) | ||
| { | ||
| tracing::error!(error=?send_err, "failed to send chat access error"); | ||
| } | ||
| continue; | ||
| } | ||
| Ok(access) => match access { | ||
| AccessLevel::View | AccessLevel::Comment => { | ||
| ws_send( | ||
| if let Err(send_err) = ws_send( | ||
|
||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::Unauthorized { | ||
| stream_id: payload.stream_id, | ||
| }, | ||
| )), | ||
| ); | ||
| ) { | ||
| tracing::error!(error=?send_err, "failed to send unauthorized error"); | ||
| } | ||
| continue; | ||
| } | ||
| _ => (), | ||
|
|
@@ -285,8 +296,10 @@ async fn handle_websocket_connection( | |
| &connection_id, | ||
| &jwt_token | ||
| )) => { | ||
| if let Err(err) = result { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err.into())) | ||
| if let Err(err) = result | ||
| && let Err(send_err) = ws_send(&message_sender_clone, FromWebSocketMessage::Error(err.into())) | ||
| { | ||
| tracing::error!(error=?send_err, "failed to send chat message error"); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -297,14 +310,16 @@ async fn handle_websocket_connection( | |
| { | ||
| Ok(id) => Arc::new(id), | ||
| Err(_) => { | ||
| ws_send( | ||
| if let Err(send_err) = ws_send( | ||
|
||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::InternalError { | ||
| stream_id: payload.stream_id.clone(), | ||
| }, | ||
| )), | ||
| ); | ||
| ) { | ||
| tracing::error!(error=?send_err, "failed to send user id error"); | ||
| } | ||
| continue; | ||
| } | ||
| }; | ||
|
|
@@ -321,19 +336,25 @@ async fn handle_websocket_connection( | |
| .await | ||
| { | ||
| Err(e) => { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(e)); | ||
| if let Err(send_err) = | ||
|
||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(e)) | ||
| { | ||
| tracing::error!(error=?send_err, "failed to send chat access error"); | ||
| } | ||
| continue; | ||
| } | ||
| Ok(access) => match access { | ||
| AccessLevel::View | AccessLevel::Comment => { | ||
| ws_send( | ||
| if let Err(send_err) = ws_send( | ||
|
||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(WebSocketError::StreamError( | ||
| StreamError::Unauthorized { | ||
| stream_id: payload.stream_id, | ||
| }, | ||
| )), | ||
| ); | ||
| ) { | ||
| tracing::error!(error=?send_err, "failed to send unauthorized error"); | ||
| } | ||
| continue; | ||
| } | ||
| _ => (), | ||
|
|
@@ -349,11 +370,13 @@ async fn handle_websocket_connection( | |
| &jwt_token, | ||
| ) | ||
| .await; | ||
| if let Err(err) = result { | ||
| ws_send( | ||
| if let Err(err) = result | ||
|
||
| && let Err(send_err) = ws_send( | ||
| &message_sender_clone, | ||
| FromWebSocketMessage::Error(err.into()), | ||
| ) | ||
| { | ||
| tracing::error!(error=?send_err, "failed to send edit message error"); | ||
| } | ||
| }); | ||
| } | ||
|
|
@@ -368,8 +391,10 @@ async fn handle_websocket_connection( | |
| .map_err(|err| WebSocketError::FailedToSelectModel { | ||
| details: Some(err.to_string()), | ||
| }) | ||
| && let Err(send_err) = | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)) | ||
| { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)); | ||
| tracing::error!(error=?send_err, "failed to send select model error"); | ||
| } | ||
| } | ||
| ToWebSocketMessage::ExtractionStatus(payload) => { | ||
|
|
@@ -382,8 +407,10 @@ async fn handle_websocket_connection( | |
| ) | ||
| .await | ||
| .map_err(|_err| WebSocketError::ExtractionStatusFailed { attachment_id }) | ||
| && let Err(send_err) = | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)) | ||
| { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)) | ||
| tracing::error!(error=?send_err, "failed to send extraction status error"); | ||
| } | ||
| } | ||
| ToWebSocketMessage::SendCompletion(payload) => { | ||
|
|
@@ -395,21 +422,25 @@ async fn handle_websocket_connection( | |
| user_id, | ||
| ) | ||
| .await | ||
| && let Err(send_err) = | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)) | ||
| { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)) | ||
| tracing::error!(error=?send_err, "failed to send completion error"); | ||
| } | ||
| } | ||
| ToWebSocketMessage::GetSimpleCompletionStream(payload) => { | ||
| let user_id = &user_context_clone.user_id; | ||
| let response = handle_simple_completion( | ||
| if let Err(err) = handle_simple_completion( | ||
| ctx_clone.clone(), | ||
| &message_sender_clone, | ||
| &payload, | ||
| user_id, | ||
| ) | ||
| .await; | ||
| if let Err(err) = response { | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)); | ||
| .await | ||
| && let Err(send_err) = | ||
| ws_send(&message_sender_clone, FromWebSocketMessage::Error(err)) | ||
| { | ||
| tracing::error!(error=?send_err, "failed to send simple completion error"); | ||
| } | ||
| } | ||
| }; | ||
|
|
@@ -435,10 +466,13 @@ async fn handle_websocket_connection( | |
| CONNECTION_MAP.remove(&connection_id); | ||
| } | ||
|
|
||
| pub fn ws_send(sender: &UnboundedSender<FromWebSocketMessage>, message: FromWebSocketMessage) { | ||
| if let Err(e) = sender.send(message) { | ||
| tracing::error!(error=%e, "failed to send message to channel"); | ||
| } | ||
| pub fn ws_send( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you do |
||
| sender: &UnboundedSender<FromWebSocketMessage>, | ||
| message: FromWebSocketMessage, | ||
| ) -> anyhow::Result<()> { | ||
| sender | ||
| .send(message) | ||
| .map_err(|e| anyhow::anyhow!("failed to send message to channel: {}", e)) | ||
| } | ||
|
|
||
| /// Errors that can occur when checking the user quota | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inspect err