11package com .getindata .connectors .http .internal .table .lookup ;
22
3- import java .io .IOException ;
43import java .net .URI ;
54import java .net .http .HttpClient ;
65import java .net .http .HttpRequest ;
@@ -228,42 +227,12 @@ public void shouldBuildClientWithHeaders() throws ConfigurationException {
228227 }
229228
230229 @ Test
231- public void shouldCallCloseOnCollectorInDeserializeSingleValue () throws Exception {
230+ public void shouldCollectRowDataInCollector () throws ConfigurationException {
232231 // GIVEN
233- // Track if close() and collect() were called
234- final int [] collectCount = {0 };
235- final boolean [] closeCalled = {false };
236-
237- // Create a mock DeserializationSchema that calls both collect() and close()
238- DeserializationSchema <RowData > mockDecoder = new DeserializationSchema <RowData >() {
239- @ Override
240- public RowData deserialize (byte [] message ) throws IOException {
241- return null ;
242- }
243-
244- @ Override
245- public void deserialize (byte [] message , Collector <RowData > out ) throws IOException {
246- // Simulate adding multiple records
247- out .collect (GenericRowData .of (StringData .fromString ("test1" )));
248- out .collect (GenericRowData .of (StringData .fromString ("test2" )));
249- // Call close() to trigger the code path we want to test
250- out .close ();
251- }
252-
253- @ Override
254- public boolean isEndOfStream (RowData nextElement ) {
255- return false ;
256- }
257-
258- @ Override
259- public org .apache .flink .api .common .typeinfo .TypeInformation <RowData > getProducedType () {
260- return null ;
261- }
262- };
263-
232+ List <RowData > result = new ArrayList <>();
264233 JavaNetHttpPollingClient client = new JavaNetHttpPollingClient (
265234 httpClient ,
266- mockDecoder ,
235+ decoder ,
267236 options ,
268237 new GetRequestFactory (
269238 new GenericGetQueryCreator (lookupRow ),
@@ -272,56 +241,53 @@ public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProduced
272241 )
273242 );
274243
275- // WHEN
276- // Call deserializeSingleValue through reflection to test it directly
277- java .lang .reflect .Method method = JavaNetHttpPollingClient .class
278- .getDeclaredMethod ("deserializeSingleValue" , byte [].class );
279- method .setAccessible (true );
244+ Collector <RowData > collector = client .createRowDataCollector (result );
280245
281- byte [] testData = "{\" test\" :\" value\" }" .getBytes ();
282- List <RowData > result = (List <RowData >) method .invoke (client , testData );
246+ RowData row1 = GenericRowData .of (StringData .fromString ("test1" ));
247+ RowData row2 = GenericRowData .of (StringData .fromString ("test2" ));
248+
249+ // WHEN
250+ collector .collect (row1 );
251+ collector .collect (row2 );
283252
284253 // THEN
285- // Verify that the method executed successfully and returned results
286- assertThat (result ).isNotNull ();
287254 assertThat (result ).hasSize (2 );
288- assertThat (result .get (0 ).getString (0 ).toString ()).isEqualTo ("test1" );
289- assertThat (result .get (1 ).getString (0 ).toString ()).isEqualTo ("test2" );
290-
291- // Note: The close() method in the anonymous Collector is a no-op,
292- // but this test ensures it's called and doesn't throw exceptions
255+ assertThat (result .get (0 )).isEqualTo (row1 );
256+ assertThat (result .get (1 )).isEqualTo (row2 );
293257 }
294258
295259 @ Test
296- public void shouldHandleEmptyResultInDeserializeSingleValue () throws Exception {
260+ public void shouldCallCloseOnRowDataCollectorWithoutException () throws ConfigurationException {
297261 // GIVEN
298- // Create a mock DeserializationSchema that doesn't add any records but calls close()
299- DeserializationSchema <RowData > mockDecoder = new DeserializationSchema <RowData >() {
300- @ Override
301- public RowData deserialize (byte [] message ) throws IOException {
302- return null ;
303- }
304-
305- @ Override
306- public void deserialize (byte [] message , Collector <RowData > out ) throws IOException {
307- // Don't collect anything, just close
308- out .close ();
309- }
310-
311- @ Override
312- public boolean isEndOfStream (RowData nextElement ) {
313- return false ;
314- }
315-
316- @ Override
317- public org .apache .flink .api .common .typeinfo .TypeInformation <RowData > getProducedType () {
318- return null ;
319- }
320- };
262+ List <RowData > result = new ArrayList <>();
263+ JavaNetHttpPollingClient client = new JavaNetHttpPollingClient (
264+ httpClient ,
265+ decoder ,
266+ options ,
267+ new GetRequestFactory (
268+ new GenericGetQueryCreator (lookupRow ),
269+ headerPreprocessor ,
270+ options
271+ )
272+ );
273+
274+ Collector <RowData > collector = client .createRowDataCollector (result );
275+ collector .collect (GenericRowData .of (StringData .fromString ("test" )));
276+
277+ // WHEN - close should not throw any exception
278+ collector .close ();
279+
280+ // THEN
281+ assertThat (result ).hasSize (1 );
282+ }
321283
284+ @ Test
285+ public void shouldHandleEmptyCollectionInRowDataCollector () throws ConfigurationException {
286+ // GIVEN
287+ List <RowData > result = new ArrayList <>();
322288 JavaNetHttpPollingClient client = new JavaNetHttpPollingClient (
323289 httpClient ,
324- mockDecoder ,
290+ decoder ,
325291 options ,
326292 new GetRequestFactory (
327293 new GenericGetQueryCreator (lookupRow ),
@@ -330,16 +296,12 @@ public org.apache.flink.api.common.typeinfo.TypeInformation<RowData> getProduced
330296 )
331297 );
332298
333- // WHEN
334- java .lang .reflect .Method method = JavaNetHttpPollingClient .class
335- .getDeclaredMethod ("deserializeSingleValue" , byte [].class );
336- method .setAccessible (true );
299+ Collector <RowData > collector = client .createRowDataCollector (result );
337300
338- byte [] testData = "{}" . getBytes ();
339- List < RowData > result = ( List < RowData >) method . invoke ( client , testData );
301+ // WHEN - close without collecting anything
302+ collector . close ( );
340303
341304 // THEN
342- assertThat (result ).isNotNull ();
343305 assertThat (result ).isEmpty ();
344306 }
345307
0 commit comments