@@ -508,7 +508,7 @@ def create_driver(conf=base_config)
508508  end 
509509
510510  sub_test_case  'compressed packed forward'  do 
511-     test  'set_compress_to_option '  do 
511+     test  'set_compress_to_option_gzip '  do 
512512      @d  =  d  =  create_driver 
513513
514514      time_i  =  event_time ( "2011-01-02 13:14:15 UTC" ) . to_i 
@@ -535,6 +535,33 @@ def create_driver(conf=base_config)
535535      assert_equal  events ,  d . events 
536536    end 
537537
538+     test  'set_compress_to_option_zstd'  do 
539+       @d  =  d  =  create_driver 
540+ 
541+       time_i  =  event_time ( "2011-01-02 13:14:15 UTC" ) . to_i 
542+       events  =  [ 
543+         [ "tag1" ,  time_i ,  { "a" => 1 } ] , 
544+         [ "tag1" ,  time_i ,  { "a" => 2 } ] 
545+       ] 
546+ 
547+       # create compressed entries 
548+       entries  =  '' 
549+       events . each  do  |_tag ,  _time ,  record |
550+         v  =  [ _time ,  record ] . to_msgpack 
551+         entries  << compress ( v ,  type : :zstd ) 
552+       end 
553+       chunk  =  [ "tag1" ,  entries ,  {  'compressed'  =>  'zstd'  } ] . to_msgpack 
554+ 
555+       d . run  do 
556+         Fluent ::MessagePackFactory . msgpack_unpacker . feed_each ( chunk )  do  |obj |
557+           option  =  d . instance . send ( :on_message ,  obj ,  chunk . size ,  DUMMY_SOCK ) 
558+           assert_equal  'zstd' ,  option [ 'compressed' ] 
559+         end 
560+       end 
561+ 
562+       assert_equal  events ,  d . events 
563+     end 
564+ 
538565    test  'create_CompressedMessagePackEventStream_with_gzip_compress_option'  do 
539566      @d  =  d  =  create_driver 
540567
@@ -562,6 +589,34 @@ def create_driver(conf=base_config)
562589        end 
563590      end 
564591    end 
592+ 
593+     test  'create_CompressedMessagePackEventStream_with_zstd_compress_option'  do 
594+       @d  =  d  =  create_driver 
595+ 
596+       time_i  =  event_time ( "2011-01-02 13:14:15 UTC" ) . to_i 
597+       events  =  [ 
598+         [ "tag1" ,  time_i ,  { "a" => 1 } ] , 
599+         [ "tag1" ,  time_i ,  { "a" => 2 } ] 
600+       ] 
601+ 
602+       # create compressed entries 
603+       entries  =  '' 
604+       events . each  do  |_tag ,  _time ,  record |
605+         v  =  [ _time ,  record ] . to_msgpack 
606+         entries  << compress ( v ) 
607+       end 
608+       chunk  =  [ "tag1" ,  entries ,  {  'compressed'  =>  'zstd'  } ] . to_msgpack 
609+ 
610+       # check CompressedMessagePackEventStream is created 
611+       mock ( Fluent ::CompressedMessagePackEventStream ) . new ( entries ,  nil ,  0 ,  compress : :zstd ) 
612+ 
613+       d . run  do 
614+         Fluent ::MessagePackFactory . msgpack_unpacker . feed_each ( chunk )  do  |obj |
615+           option  =  d . instance . send ( :on_message ,  obj ,  chunk . size ,  DUMMY_SOCK ) 
616+           assert_equal  'zstd' ,  option [ 'compressed' ] 
617+         end 
618+       end 
619+     end 
565620  end 
566621
567622  sub_test_case  'warning'  do 
0 commit comments