Skip to content

Commit

Permalink
Merge "Add file format detection to format_inspector" into unmaintain…
Browse files Browse the repository at this point in the history
…ed/yoga
  • Loading branch information
Zuul authored and openstack-gerrit committed Feb 14, 2025
2 parents 80aa538 + 28cac20 commit db59e4b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 67 deletions.
54 changes: 43 additions & 11 deletions glance/common/format_inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,20 +883,52 @@ def close(self):
self._source.close()


ALL_FORMATS = {
'raw': FileInspector,
'qcow2': QcowInspector,
'vhd': VHDInspector,
'vhdx': VHDXInspector,
'vmdk': VMDKInspector,
'vdi': VDIInspector,
'qed': QEDInspector,
}


def get_inspector(format_name):
"""Returns a FormatInspector class based on the given name.
:param format_name: The name of the disk_format (raw, qcow2, etc).
:returns: A FormatInspector or None if unsupported.
"""
formats = {
'raw': FileInspector,
'qcow2': QcowInspector,
'vhd': VHDInspector,
'vhdx': VHDXInspector,
'vmdk': VMDKInspector,
'vdi': VDIInspector,
'qed': QEDInspector,
}

return formats.get(format_name)

return ALL_FORMATS.get(format_name)


def detect_file_format(filename):
"""Attempts to detect the format of a file.
This runs through a file one time, running all the known inspectors in
parallel. It stops reading the file once one of them matches or all of
them are sure they don't match.
Returns the FileInspector that matched, if any. None if 'raw'.
"""
inspectors = {k: v() for k, v in ALL_FORMATS.items()}
with open(filename, 'rb') as f:
for chunk in chunked_reader(f):
for format, inspector in list(inspectors.items()):
try:
inspector.eat_chunk(chunk)
except ImageFormatError:
# No match, so stop considering this format
inspectors.pop(format)
continue
if (inspector.format_match and inspector.complete and
format != 'raw'):
# First complete match (other than raw) wins
return inspector
if all(i.complete for i in inspectors.values()):
# If all the inspectors are sure they are not a match, avoid
# reading to the end of the file to settle on 'raw'.
break
return inspectors['raw']
56 changes: 0 additions & 56 deletions glance/tests/unit/common/test_format_inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,62 +315,6 @@ def test_qcow2_feature_flag_checks(self):
data[0x4F] = 0x80
self.assertTrue(inspector.has_unknown_features)

def test_vmdk_safety_checks(self):
region = format_inspector.CaptureRegion(0, 0)
inspector = format_inspector.VMDKInspector()
inspector.new_region('descriptor', region)

# This should be a legit VMDK descriptor which comments, blank lines,
# an extent, some ddb content, and some header values.
legit_desc = ['# This is a comment',
'',
' ',
'createType=monolithicSparse',
'RW 1234 SPARSE "foo.vmdk"',
'ddb.adapterType = "MFM',
'# EOF']
region.data = ('\n'.join(legit_desc)).encode('ascii')
region.length = len(region.data)
self.assertTrue(inspector.safety_check())

# Any of these lines should trigger an error indicating that there is
# something in the descriptor we don't understand
bad_lines = [
'#\U0001F4A9',
'header Name=foo',
'foo bar',
'WR 123 SPARSE "foo.vmdk"',
]

for bad_line in bad_lines:
# Encode as UTF-8 purely so we can test that anything non-ASCII
# will trigger the decode check
region.data = bad_line.encode('utf-8')
region.length = len(region.data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'Invalid VMDK descriptor',
inspector.safety_check)

# Extents with slashes in the name fail the safety check
region.data = b'RW 123 SPARSE "/etc/shadow"'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())

# A descriptor that specifies no extents fails the safety check
region.data = b'# Nothing'
region.length = len(region.data)
self.assertFalse(inspector.safety_check())

def test_vmdk_reject_footer(self):
data = struct.pack('<4sIIQQQQIQQ', b'KDMV', 3, 0, 0, 0, 0, 1, 0, 0,
format_inspector.VMDKInspector.GD_AT_END)
inspector = format_inspector.VMDKInspector()
inspector.region('header').data = data
inspector.region('header').length = len(data)
self.assertRaisesRegex(format_inspector.ImageFormatError,
'footer',
inspector.post_process)

def test_vdi(self):
self._test_format('vdi')

Expand Down

0 comments on commit db59e4b

Please sign in to comment.