@@ -119,6 +119,72 @@ impl fmt::Display for ResponseData {
119119 }
120120}
121121
122+ #[ cfg( feature = "with-tokio" ) ]
123+ impl tokio:: io:: AsyncRead for ResponseDataStream {
124+ fn poll_read (
125+ mut self : Pin < & mut Self > ,
126+ cx : & mut std:: task:: Context < ' _ > ,
127+ buf : & mut tokio:: io:: ReadBuf < ' _ > ,
128+ ) -> std:: task:: Poll < std:: io:: Result < ( ) > > {
129+ // Poll the stream for the next chunk of bytes
130+ match Stream :: poll_next ( self . bytes . as_mut ( ) , cx) {
131+ std:: task:: Poll :: Ready ( Some ( Ok ( chunk) ) ) => {
132+ // Write as much of the chunk as fits in the buffer
133+ let amt = std:: cmp:: min ( chunk. len ( ) , buf. remaining ( ) ) ;
134+ buf. put_slice ( & chunk[ ..amt] ) ;
135+
136+ // AIDEV-NOTE: Bytes that don't fit in the buffer are discarded from this chunk.
137+ // This is expected AsyncRead behavior - consumers should use appropriately sized
138+ // buffers or wrap in BufReader for efficiency with small reads.
139+
140+ std:: task:: Poll :: Ready ( Ok ( ( ) ) )
141+ }
142+ std:: task:: Poll :: Ready ( Some ( Err ( error) ) ) => {
143+ // Convert S3Error to io::Error
144+ std:: task:: Poll :: Ready ( Err ( std:: io:: Error :: other ( error) ) )
145+ }
146+ std:: task:: Poll :: Ready ( None ) => {
147+ // Stream is exhausted, signal EOF by returning Ok(()) with no bytes written
148+ std:: task:: Poll :: Ready ( Ok ( ( ) ) )
149+ }
150+ std:: task:: Poll :: Pending => std:: task:: Poll :: Pending ,
151+ }
152+ }
153+ }
154+
155+ #[ cfg( feature = "with-async-std" ) ]
156+ impl async_std:: io:: Read for ResponseDataStream {
157+ fn poll_read (
158+ mut self : Pin < & mut Self > ,
159+ cx : & mut std:: task:: Context < ' _ > ,
160+ buf : & mut [ u8 ] ,
161+ ) -> std:: task:: Poll < std:: io:: Result < usize > > {
162+ // Poll the stream for the next chunk of bytes
163+ match Stream :: poll_next ( self . bytes . as_mut ( ) , cx) {
164+ std:: task:: Poll :: Ready ( Some ( Ok ( chunk) ) ) => {
165+ // Write as much of the chunk as fits in the buffer
166+ let amt = std:: cmp:: min ( chunk. len ( ) , buf. len ( ) ) ;
167+ buf[ ..amt] . copy_from_slice ( & chunk[ ..amt] ) ;
168+
169+ // AIDEV-NOTE: Bytes that don't fit in the buffer are discarded from this chunk.
170+ // This is expected AsyncRead behavior - consumers should use appropriately sized
171+ // buffers or wrap in BufReader for efficiency with small reads.
172+
173+ std:: task:: Poll :: Ready ( Ok ( amt) )
174+ }
175+ std:: task:: Poll :: Ready ( Some ( Err ( error) ) ) => {
176+ // Convert S3Error to io::Error
177+ std:: task:: Poll :: Ready ( Err ( std:: io:: Error :: other ( error) ) )
178+ }
179+ std:: task:: Poll :: Ready ( None ) => {
180+ // Stream is exhausted, signal EOF by returning 0 bytes read
181+ std:: task:: Poll :: Ready ( Ok ( 0 ) )
182+ }
183+ std:: task:: Poll :: Pending => std:: task:: Poll :: Pending ,
184+ }
185+ }
186+ }
187+
122188#[ maybe_async:: maybe_async]
123189pub trait Request {
124190 type Response ;
@@ -711,3 +777,227 @@ pub trait Request {
711777 Ok ( headers)
712778 }
713779}
780+
781+ #[ cfg( all( test, feature = "with-tokio" ) ) ]
782+ mod tests {
783+ use super :: * ;
784+ use bytes:: Bytes ;
785+ use futures_util:: stream;
786+ use tokio:: io:: AsyncReadExt ;
787+
788+ #[ tokio:: test]
789+ async fn test_async_read_implementation ( ) {
790+ // Create a mock stream with test data
791+ let chunks = vec ! [
792+ Ok ( Bytes :: from( "Hello, " ) ) ,
793+ Ok ( Bytes :: from( "World!" ) ) ,
794+ Ok ( Bytes :: from( " This is a test." ) ) ,
795+ ] ;
796+
797+ let stream = stream:: iter ( chunks) ;
798+ let data_stream: DataStream = Box :: pin ( stream) ;
799+
800+ let mut response_stream = ResponseDataStream {
801+ bytes : data_stream,
802+ status_code : 200 ,
803+ } ;
804+
805+ // Read all data using AsyncRead
806+ let mut buffer = Vec :: new ( ) ;
807+ response_stream. read_to_end ( & mut buffer) . await . unwrap ( ) ;
808+
809+ assert_eq ! ( buffer, b"Hello, World! This is a test." ) ;
810+ }
811+
812+ #[ tokio:: test]
813+ async fn test_async_read_with_small_buffer ( ) {
814+ // Create a stream with a large chunk
815+ let chunks = vec ! [
816+ Ok ( Bytes :: from( "This is a much longer string that won't fit in a small buffer" ) ) ,
817+ ] ;
818+
819+ let stream = stream:: iter ( chunks) ;
820+ let data_stream: DataStream = Box :: pin ( stream) ;
821+
822+ let mut response_stream = ResponseDataStream {
823+ bytes : data_stream,
824+ status_code : 200 ,
825+ } ;
826+
827+ // Read with a small buffer - demonstrates that excess bytes are discarded per chunk
828+ let mut buffer = [ 0u8 ; 10 ] ;
829+ let n = response_stream. read ( & mut buffer) . await . unwrap ( ) ;
830+
831+ // We should only get the first 10 bytes
832+ assert_eq ! ( n, 10 ) ;
833+ assert_eq ! ( & buffer[ ..n] , b"This is a " ) ;
834+
835+ // Next read should get 0 bytes (EOF) because the chunk was consumed
836+ let n = response_stream. read ( & mut buffer) . await . unwrap ( ) ;
837+ assert_eq ! ( n, 0 ) ;
838+ }
839+
840+ #[ tokio:: test]
841+ async fn test_async_read_with_error ( ) {
842+ use crate :: error:: S3Error ;
843+
844+ // Create a stream that returns an error
845+ let chunks: Vec < Result < Bytes , S3Error > > = vec ! [
846+ Ok ( Bytes :: from( "Some data" ) ) ,
847+ Err ( S3Error :: Io ( std:: io:: Error :: new( std:: io:: ErrorKind :: Other , "Test error" ) ) ) ,
848+ ] ;
849+
850+ let stream = stream:: iter ( chunks) ;
851+ let data_stream: DataStream = Box :: pin ( stream) ;
852+
853+ let mut response_stream = ResponseDataStream {
854+ bytes : data_stream,
855+ status_code : 200 ,
856+ } ;
857+
858+ // First read should succeed
859+ let mut buffer = [ 0u8 ; 20 ] ;
860+ let n = response_stream. read ( & mut buffer) . await . unwrap ( ) ;
861+ assert_eq ! ( n, 9 ) ;
862+ assert_eq ! ( & buffer[ ..n] , b"Some data" ) ;
863+
864+ // Second read should fail with an error
865+ let result = response_stream. read ( & mut buffer) . await ;
866+ assert ! ( result. is_err( ) ) ;
867+ }
868+
869+ #[ tokio:: test]
870+ async fn test_async_read_copy ( ) {
871+ // Test using tokio::io::copy which is a common use case
872+ let chunks = vec ! [
873+ Ok ( Bytes :: from( "First chunk\n " ) ) ,
874+ Ok ( Bytes :: from( "Second chunk\n " ) ) ,
875+ Ok ( Bytes :: from( "Third chunk\n " ) ) ,
876+ ] ;
877+
878+ let stream = stream:: iter ( chunks) ;
879+ let data_stream: DataStream = Box :: pin ( stream) ;
880+
881+ let mut response_stream = ResponseDataStream {
882+ bytes : data_stream,
883+ status_code : 200 ,
884+ } ;
885+
886+ let mut output = Vec :: new ( ) ;
887+ tokio:: io:: copy ( & mut response_stream, & mut output) . await . unwrap ( ) ;
888+
889+ assert_eq ! ( output, b"First chunk\n Second chunk\n Third chunk\n " ) ;
890+ }
891+ }
892+
893+ #[ cfg( all( test, feature = "with-async-std" ) ) ]
894+ mod async_std_tests {
895+ use super :: * ;
896+ use bytes:: Bytes ;
897+ use futures_util:: stream;
898+ use async_std:: io:: ReadExt ;
899+
900+ #[ async_std:: test]
901+ async fn test_async_read_implementation ( ) {
902+ // Create a mock stream with test data
903+ let chunks = vec ! [
904+ Ok ( Bytes :: from( "Hello, " ) ) ,
905+ Ok ( Bytes :: from( "World!" ) ) ,
906+ Ok ( Bytes :: from( " This is a test." ) ) ,
907+ ] ;
908+
909+ let stream = stream:: iter ( chunks) ;
910+ let data_stream: DataStream = Box :: pin ( stream) ;
911+
912+ let mut response_stream = ResponseDataStream {
913+ bytes : data_stream,
914+ status_code : 200 ,
915+ } ;
916+
917+ // Read all data using AsyncRead
918+ let mut buffer = Vec :: new ( ) ;
919+ response_stream. read_to_end ( & mut buffer) . await . unwrap ( ) ;
920+
921+ assert_eq ! ( buffer, b"Hello, World! This is a test." ) ;
922+ }
923+
924+ #[ async_std:: test]
925+ async fn test_async_read_with_small_buffer ( ) {
926+ // Create a stream with a large chunk
927+ let chunks = vec ! [
928+ Ok ( Bytes :: from( "This is a much longer string that won't fit in a small buffer" ) ) ,
929+ ] ;
930+
931+ let stream = stream:: iter ( chunks) ;
932+ let data_stream: DataStream = Box :: pin ( stream) ;
933+
934+ let mut response_stream = ResponseDataStream {
935+ bytes : data_stream,
936+ status_code : 200 ,
937+ } ;
938+
939+ // Read with a small buffer - demonstrates that excess bytes are discarded per chunk
940+ let mut buffer = [ 0u8 ; 10 ] ;
941+ let n = response_stream. read ( & mut buffer) . await . unwrap ( ) ;
942+
943+ // We should only get the first 10 bytes
944+ assert_eq ! ( n, 10 ) ;
945+ assert_eq ! ( & buffer[ ..n] , b"This is a " ) ;
946+
947+ // Next read should get 0 bytes (EOF) because the chunk was consumed
948+ let n = response_stream. read ( & mut buffer) . await . unwrap ( ) ;
949+ assert_eq ! ( n, 0 ) ;
950+ }
951+
952+ #[ async_std:: test]
953+ async fn test_async_read_with_error ( ) {
954+ use crate :: error:: S3Error ;
955+
956+ // Create a stream that returns an error
957+ let chunks: Vec < Result < Bytes , S3Error > > = vec ! [
958+ Ok ( Bytes :: from( "Some data" ) ) ,
959+ Err ( S3Error :: Io ( std:: io:: Error :: new( std:: io:: ErrorKind :: Other , "Test error" ) ) ) ,
960+ ] ;
961+
962+ let stream = stream:: iter ( chunks) ;
963+ let data_stream: DataStream = Box :: pin ( stream) ;
964+
965+ let mut response_stream = ResponseDataStream {
966+ bytes : data_stream,
967+ status_code : 200 ,
968+ } ;
969+
970+ // First read should succeed
971+ let mut buffer = [ 0u8 ; 20 ] ;
972+ let n = response_stream. read ( & mut buffer) . await . unwrap ( ) ;
973+ assert_eq ! ( n, 9 ) ;
974+ assert_eq ! ( & buffer[ ..n] , b"Some data" ) ;
975+
976+ // Second read should fail with an error
977+ let result = response_stream. read ( & mut buffer) . await ;
978+ assert ! ( result. is_err( ) ) ;
979+ }
980+
981+ #[ async_std:: test]
982+ async fn test_async_read_copy ( ) {
983+ // Test using async_std::io::copy which is a common use case
984+ let chunks = vec ! [
985+ Ok ( Bytes :: from( "First chunk\n " ) ) ,
986+ Ok ( Bytes :: from( "Second chunk\n " ) ) ,
987+ Ok ( Bytes :: from( "Third chunk\n " ) ) ,
988+ ] ;
989+
990+ let stream = stream:: iter ( chunks) ;
991+ let data_stream: DataStream = Box :: pin ( stream) ;
992+
993+ let mut response_stream = ResponseDataStream {
994+ bytes : data_stream,
995+ status_code : 200 ,
996+ } ;
997+
998+ let mut output = Vec :: new ( ) ;
999+ async_std:: io:: copy ( & mut response_stream, & mut output) . await . unwrap ( ) ;
1000+
1001+ assert_eq ! ( output, b"First chunk\n Second chunk\n Third chunk\n " ) ;
1002+ }
1003+ }
0 commit comments