@@ -29,11 +29,12 @@ class FileOutput < Output
2929
3030 helpers :formatter , :inject , :compat_parameters
3131
32- SUPPORTED_COMPRESS = [ :text , :gz , :gzip ]
32+ SUPPORTED_COMPRESS = [ :text , :gz , :gzip , :zstd ]
3333 SUPPORTED_COMPRESS_MAP = {
3434 text : nil ,
3535 gz : :gzip ,
3636 gzip : :gzip ,
37+ zstd : :zstd ,
3738 }
3839
3940 DEFAULT_TIMEKEY = 60 * 60 * 24
@@ -184,6 +185,14 @@ def configure(conf)
184185 @buffer . symlink_path = @symlink_path
185186 @buffer . output_plugin_for_symlink = self
186187 end
188+
189+ unless @buffer . compress == :text
190+ if @compress == :text
191+ log . info "buffer is compressed. If you also want to save disk space, Add `compress` configuration in <match>"
192+ elsif @buffer . compress != @compress_method
193+ raise Fluent ::ConfigError , "You cannot specify different compression formats for Buffer (Buffer: #{ @buffer . compress } , Self: #{ @compress } )"
194+ end
195+ end
187196 end
188197
189198 @dir_perm = system_config . dir_permission || Fluent ::DEFAULT_DIR_PERMISSION
@@ -212,17 +221,17 @@ def write(chunk)
212221 FileUtils . mkdir_p File . dirname ( path ) , mode : @dir_perm
213222
214223 writer = case
215- when @compress_method . nil?
216- method ( :write_without_compression )
217- when @compress_method == :gzip
218- if @buffer . compress != :gzip || @recompress
219- method ( :write_gzip_with_compression )
220- else
221- method ( :write_gzip_from_gzipped_chunk )
222- end
223- else
224- raise "BUG: unknown compression method #{ @compress_method } "
225- end
224+ when @compress_method . nil?
225+ method ( :write_without_compression )
226+ when @compress_method != :text
227+ if @buffer . compress == :text || @recompress
228+ method ( :write_with_compression ) . curry . call ( @compress_method )
229+ else
230+ method ( :write_from_compressed_chunk ) . curry . call ( @compress_method )
231+ end
232+ else
233+ raise "BUG: unknown compression method #{ @compress_method } "
234+ end
226235
227236 if @append
228237 if @need_lock
@@ -253,17 +262,22 @@ def write_without_compression(path, chunk)
253262 end
254263 end
255264
256- def write_gzip_with_compression ( path , chunk )
265+ def write_with_compression ( type , path , chunk )
257266 File . open ( path , "ab" , @file_perm ) do |f |
258- gz = Zlib ::GzipWriter . new ( f )
267+ gz = nil
268+ if type == :gzip
269+ gz = Zlib ::GzipWriter . new ( f )
270+ elsif type == :zstd
271+ gz = Zstd ::StreamWriter . new ( f )
272+ end
259273 chunk . write_to ( gz , compressed : :text )
260274 gz . close
261275 end
262276 end
263277
264- def write_gzip_from_gzipped_chunk ( path , chunk )
278+ def write_from_compressed_chunk ( type , path , chunk )
265279 File . open ( path , "ab" , @file_perm ) do |f |
266- chunk . write_to ( f , compressed : :gzip )
280+ chunk . write_to ( f , compressed : type )
267281 end
268282 end
269283
@@ -280,6 +294,7 @@ def timekey_to_timeformat(timekey)
280294 def compression_suffix ( compress )
281295 case compress
282296 when :gzip then '.gz'
297+ when :zstd then '.zstd'
283298 when nil then ''
284299 else
285300 raise ArgumentError , "unknown compression type #{ compress } "
0 commit comments