Skip to content

fix broken gzip file produced sometimes #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 68 additions & 7 deletions lib/logstash/outputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,53 @@ def multi_receive_encoded(events_and_encoded)
# append to the file
chunks.each {|chunk| fd.write(chunk) }
end
fd.flush unless @flusher && @flusher.alive?
on_flush(fd, path) unless @flusher && @flusher.alive?
end

close_stale_files
end
end

def on_flush(fd, path)
fd.flush
if fd.is_temp
copy_to_gzip(fd, path)
end
end

def copy_to_gzip(fd, path)
zipfd = get_file(path)
zipfd = Zlib::GzipWriter.new(zipfd)
fd.seek(0, IO::SEEK_SET)
data = fd.read
fd.truncate(0)
fd.seek(0, IO::SEEK_SET)
if @write_behavior == "overwrite"
zipfd.to_io.truncate(0)
zipfd.to_io.seek(0, IO::SEEK_SET)
end
zipfd.write(data)
zipfd.flush
zipfd.to_io.flush
zipfd.close
end

def close
@flusher.stop unless @flusher.nil?
@io_mutex.synchronize do
@logger.debug("Close: closing files")

@files.each do |path, fd|
begin
if fd.is_temp
copy_to_gzip(fd, path)
end
fd.close
@logger.debug("Closed file #{path}", :fd => fd)
if fd.is_temp && File.exist?(fd.path)
File.delete(fd.path)
@logger.debug("Deleted temp file ", :path => fd.path)
end
@logger.debug("Closed file #{fd}", :fd => fd)
rescue Exception => e
@logger.error("Exception while flushing and closing files.", :exception => e)
end
Expand Down Expand Up @@ -202,6 +233,9 @@ def flush_pending_files
@files.each do |path, fd|
@logger.debug("Flushing file", :path => path, :fd => fd)
fd.flush
if fd.is_temp
copy_to_gzip(fd, path)
end
end
end
rescue => e
Expand All @@ -220,6 +254,9 @@ def close_stale_files
inactive_files.each do |path, fd|
@logger.info("Closing file %s" % path)
fd.close
if fd.is_temp && File.exist?(fd.path)
File.delete(fd.path)
end
@files.delete(path)
end
# mark all files as inactive, a call to write will mark them as active again
Expand All @@ -236,6 +273,7 @@ def deleted?(path)
end

def open(path)
originalPath = path
if !deleted?(path) && cached?(path)
return @files[path]
end
Expand All @@ -249,8 +287,27 @@ def open(path)
end
end

#Fix for broken gzip issue.
if gzip
tmpfile = java.io.File.createTempFile("outfile-", "-temp");
path = tmpfile.path
#create file at original path also, so that temp file is not created again
make_dir(originalPath)
gzFile = get_file(originalPath)
#if gzFile is fifo type, file writer object is returned that needs to closed.
if gzFile.class == Java::JavaIo::FileWriter
gzFile.close
end
end

@logger.info("Opening file", :path => path)
make_dir(path)
fd = get_file(path)
@files[originalPath] = IOWriter.new(fd, gzip)
return @files[originalPath]
end

def make_dir(path)
dir = File.dirname(path)
if !Dir.exist?(dir)
@logger.info("Creating directory", :directory => dir)
Expand All @@ -260,7 +317,9 @@ def open(path)
FileUtils.mkdir_p(dir)
end
end
end

def get_file(path)
# work around a bug opening fifos (bug JRUBY-6280)
stat = File.stat(path) rescue nil
if stat && stat.ftype == "fifo"
Expand All @@ -272,10 +331,7 @@ def open(path)
fd = File.new(path, "a+")
end
end
if gzip
fd = Zlib::GzipWriter.new(fd)
end
@files[path] = IOWriter.new(fd)
return fd
end

##
Expand Down Expand Up @@ -356,8 +412,9 @@ def run
class IOWriter
attr_accessor :active

def initialize(io)
def initialize(io, is_temp)
@io = io
@is_temp = is_temp
end

def write(*args)
Expand All @@ -372,6 +429,10 @@ def flush
end
end

def is_temp
return @is_temp
end

def method_missing(method_name, *args, &block)
if @io.respond_to?(method_name)

Expand Down
16 changes: 14 additions & 2 deletions spec/outputs/file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,21 @@

agent do
line_num = 0
global_events = []
# Now check all events for order and correctness.
events = Zlib::GzipReader.open(tmp_file.path).map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
sorted = events.sort_by {|e| e.get("sequence")}
File.open(tmp_file.path) do |file|
zio = file
loop do
io = Zlib::GzipReader.new(zio)
events = io.map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
global_events = global_events + events
unused = io.unused
io.finish
break if unused.nil?
zio.pos -= unused.length
end
end
sorted = global_events.sort_by {|e| e.get("sequence")}
sorted.each do |event|
insist {event.get("message")} == "hello world"
insist {event.get("sequence")} == line_num
Expand Down