Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 128 additions & 43 deletions src/cardutil/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,38 @@

["MTI", "DE2", "DE3", "DE4", "DE12", "DE14", "DE22", "DE23", "DE24", "DE25", "DE26"]

PDS Subfield Configuration
==========================

The config dictionary supports a ``PDS`` key for defining subfield extraction from Mastercard PDS fields.
This allows the iso8583 module to parse and expose subfields from PDS fields (e.g., PDS0105) as separate dictionary
keys.

.. code-block:: json

"PDS": {
"0105": {
"subfields": {
"1": {"field_start": 0, "field_name": "File Type", "field_length": 3, "field_python_type": "string"},
"2": {"field_start": 3, "field_name": "File Reference Date", "field_length": 6,
"field_python_type": "string"},
"3": {"field_start": 9, "field_name": "Processor ID", "field_length": 11,
"field_python_type": "string"},
"4": {"field_start": 20, "field_name": "File Sequence Number", "field_length": 5,
"field_python_type": "string"}
}
}
}

Each PDS field (e.g., ``0105``) can define a ``subfields`` dictionary, where each subfield specifies:

- ``field_start``: Start position in the PDS field data
- ``field_length``: Length of the subfield
- ``field_name``: Description
- ``field_python_type``: (optional) Type for conversion (``string``, ``int``, ``decimal``)

When parsing, the iso8583 module will extract these subfields and add them to the output dictionary with keys like
``PDS0105_SF1``, ``PDS0105_SF2``, etc.

mci_parameter_tables
====================
Expand Down Expand Up @@ -239,8 +271,10 @@
"field_type": "LLVAR",
"field_length": 0,
"field_processor": "DE43",
"field_processor_config": r"(?P<DE43_NAME>.+?) *\\(?P<DE43_ADDRESS>.+?) *\\(?P<DE43_SUBURB>.+?) *\\"
r"(?P<DE43_POSTCODE>.{10})(?P<DE43_STATE>.{3})(?P<DE43_COUNTRY>\S{3})$",
"field_processor_config": (
r"(?P<DE43_NAME>.+?) *\\(?P<DE43_ADDRESS>.+?) *\\(?P<DE43_SUBURB>.+?) *\\"
r"(?P<DE43_POSTCODE>.{10})(?P<DE43_STATE>.{3})(?P<DE43_COUNTRY>\S{3})$"
),
Comment on lines +274 to +277
Copy link

Copilot AI Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regex string is split across multiple lines without proper concatenation. This will create a tuple instead of a single string, which could cause issues when the regex is used.

Suggested change
"field_processor_config": (
r"(?P<DE43_NAME>.+?) *\\(?P<DE43_ADDRESS>.+?) *\\(?P<DE43_SUBURB>.+?) *\\"
r"(?P<DE43_POSTCODE>.{10})(?P<DE43_STATE>.{3})(?P<DE43_COUNTRY>\S{3})$"
),
"field_processor_config":
r"(?P<DE43_NAME>.+?) *\\(?P<DE43_ADDRESS>.+?) *\\(?P<DE43_SUBURB>.+?) *\\"
r"(?P<DE43_POSTCODE>.{10})(?P<DE43_STATE>.{3})(?P<DE43_COUNTRY>\S{3})$",

Copilot uses AI. Check for mistakes.
},
"48": {
"field_name": "Additional data",
Expand Down Expand Up @@ -348,47 +382,52 @@
"field_length": 0,
},
},
"output_data_elements": [
"MTI",
"DE2",
"DE3",
"DE4",
"DE12",
"DE14",
"DE22",
"DE23",
"DE24",
"DE25",
"DE26",
"DE30",
"DE31",
"DE33",
"DE37",
"DE38",
"DE40",
"DE41",
"DE42",
"DE48",
"DE49",
"DE50",
"DE63",
"DE71",
"DE73",
"DE93",
"DE94",
"DE95",
"DE100",
"PDS0023",
"PDS0052",
"PDS0122",
"PDS0148",
"PDS0158",
"PDS0165",
"DE43_NAME",
"DE43_SUBURB",
"DE43_POSTCODE",
"ICC_DATA",
],
"PDS": {
"0001": {
"subfields": {
"1": {
"field_start": 0,
"field_name": "Account Number Type",
"field_length": 2,
"field_python_type": "string",
},
"2": {
"field_start": 2,
"field_name": "Account Number",
"field_length": 19,
"field_python_type": "string",
},
}
},
"0105": {
"subfields": {
"1": {
"field_start": 0,
"field_name": "File Type",
"field_length": 3,
"field_python_type": "string",
},
"2": {
"field_start": 3,
"field_name": "File Reference Date",
"field_length": 6,
"field_python_type": "string",
},
"3": {
"field_start": 9,
"field_name": "Processor ID",
"field_length": 11,
"field_python_type": "string",
},
"4": {
"field_start": 20,
"field_name": "File Sequence Number",
"field_length": 5,
"field_python_type": "string",
}
}
},
},
"mci_parameter_tables": {
"IP0006T1": {
"card_program_id": {"start": 19, "end": 22},
Expand Down Expand Up @@ -460,4 +499,50 @@
"life_cycle_indicator": {"start": 35, "end": 36},
},
},
"output_data_elements": [
"MTI",
"DE2",
"DE3",
"DE4",
"DE12",
"DE14",
"DE22",
"DE23",
"DE24",
"DE25",
"DE26",
"DE30",
"DE31",
"DE33",
"DE37",
"DE38",
"DE40",
"DE41",
"DE42",
"DE48",
"DE49",
"DE50",
"DE63",
"DE71",
"DE73",
"DE93",
"DE94",
"DE95",
"DE100",
"PDS0023",
"PDS0052",
"PDS0122",
"PDS0148",
"PDS0158",
"PDS0165",
# PDS0105 subfields
"PDS0105_SF1",
"PDS0105_SF2",
"PDS0105_SF3",
"PDS0105_SF4",
"DE43_NAME",
"DE43_SUBURB",
"DE43_POSTCODE",
"ICC_DATA",
],
}
90 changes: 67 additions & 23 deletions src/cardutil/iso8583.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ def _get_date_from_string(field_data: str) -> datetime:
LOGGER.debug('Using dateutil parser')
return parser.parse(field_data)
except ImportError:
pass

parser = None
if sys.version_info >= (3, 7):
LOGGER.debug('Using fromisoformat')
return datetime.datetime.fromisoformat(field_data)
Expand Down Expand Up @@ -520,13 +519,20 @@ def _pds_to_de(dict_values):
return outputs


def _pds_to_dict(field_data):
def _pds_to_dict(field_data, pds_config=None):
"""
Get MasterCard pds fields from iso field

:param field_data: the ISO8583 field containing pds fields
:param pds_config: optional PDS configuration for subfield parsing
:return: dictionary of pds key values. Key in the form PDSxxxx where x is zero filled number of pds
"""
from cardutil.config import config as default_config

# Use provided config or default
if pds_config is None:
pds_config = default_config.get('PDS', {})

field_pointer = 0
return_values = {}

Expand All @@ -542,7 +548,15 @@ def _pds_to_dict(field_data):
# get the pds data
pds_field_data = field_data[field_pointer+7:field_pointer+7+pds_field_length]
LOGGER.debug("pds_field_data=[%s]", str(pds_field_data))
return_values["PDS" + pds_field_tag] = pds_field_data

pds_key = "PDS" + pds_field_tag
return_values[pds_key] = pds_field_data

# Parse subfields if configuration exists for this PDS field
if pds_field_tag in pds_config and 'subfields' in pds_config[pds_field_tag]:
subfield_config = pds_config[pds_field_tag]['subfields']
subfields = _parse_pds_subfields(pds_field_data, subfield_config, pds_field_tag)
return_values.update(subfields)

# increment the fieldPointer
field_pointer += 7+pds_field_length
Expand All @@ -569,7 +583,7 @@ def _icc_to_dict(field_data, processor_config=None):
processor_config_dict[k.strip()] = v.strip()
except Exception:
processor_config_dict = {}

# Get error handling configuration from bit_config.
DEFAULT_ERROR_HANDLING = 'WARN'
on_error = (processor_config_dict.get('on_error') or DEFAULT_ERROR_HANDLING).upper()
Expand All @@ -583,14 +597,7 @@ def _icc_to_dict(field_data, processor_config=None):
while field_pointer < len(field_data):
# get the tag id (one byte)
field_tag = field_data[field_pointer:field_pointer+1]

# Check if there's data to read
if not field_tag:
if on_error == "ERROR":
raise Iso8583DataError("DE55: No more data to read at position %d" % field_pointer)
LOGGER.warning("DE55: No more data to read at position %d", field_pointer)
break


# set to 2 bytes if 2 byte tag
if field_tag in TWO_BYTE_TAG_PREFIXES:
# Check if there's enough data for 2-byte tag
Expand Down Expand Up @@ -619,17 +626,10 @@ def _icc_to_dict(field_data, processor_config=None):
LOGGER.warning("DE55: No length byte available for tag %s at position %d",
field_tag_display, field_pointer)
break

field_length_raw = field_data[field_pointer:field_pointer+1]
LOGGER.debug(f"{field_length_raw=}")

# Check if we got any data
if not field_length_raw:
if on_error == "ERROR":
raise Iso8583DataError("DE55: Empty length byte for tag %s" % field_tag_display)
LOGGER.warning("DE55: Empty length byte for tag %s", field_tag_display)
break


field_length = struct.unpack(">B", field_length_raw)[0]

LOGGER.debug("%s", format(field_tag_display))
Expand Down Expand Up @@ -688,6 +688,50 @@ def _get_de43_fields(de43_field, processor_config=None):
return field_dict


if __name__ == '__main__':
def _parse_pds_subfields(pds_field_data, subfield_config, pds_field_tag):
"""
Parse PDS subfields from PDS field data

:param pds_field_data: the raw PDS field data
:param subfield_config: dictionary containing subfield configuration
:param pds_field_tag: the PDS field tag (e.g., "0105")
:return: dictionary with subfield keys in format PDS<field_tag>_SF<subfield_number>
"""
subfields = {}

for sf_num, sf_config in subfield_config.items():
field_start = sf_config.get('field_start', 0)
field_length = sf_config.get('field_length', 0)
field_python_type = sf_config.get('field_python_type', 'string')

# Extract the subfield data
field_end = field_start + field_length
if field_end <= len(pds_field_data):
subfield_data = pds_field_data[field_start:field_end]

# Convert to the appropriate python type
if field_python_type == 'int':
try:
subfield_data = int(subfield_data)
except ValueError:
# Keep as string if conversion fails
pass
elif field_python_type == 'decimal':
try:
subfield_data = decimal.Decimal(subfield_data)
except (ValueError, decimal.InvalidOperation):
# Keep as string if conversion fails
pass

# Create the subfield key
subfield_key = f"PDS{pds_field_tag}_SF{sf_num}"
subfields[subfield_key] = subfield_data

LOGGER.debug("Parsed subfield %s = %s", subfield_key, subfield_data)

return subfields


if __name__ == '__main__': # pragma: no cover
import doctest
doctest.testmod()
3 changes: 1 addition & 2 deletions tests/cli/test_mci_ipm_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def test_ipm_to_csv_input_params(self):
csv_output = csv_data.read()

self.assertEqual(csv_output, "MTI,DE38\n0100,nXmXlX\n")

os.remove(in_ipm_name)
os.remove(in_ipm_name + '.csv')

Expand Down Expand Up @@ -165,7 +164,6 @@ def test_ipm_to_csv_exception_reclen_over_3000_bytes(self):
self.assertEqual(-1, result)
print(output)


def test_ipm_to_csv_invalid_file(self):
"""
Check that detected as invalid IPM file
Expand All @@ -187,5 +185,6 @@ def test_ipm_to_csv_invalid_file(self):
self.assertEqual(-1, result)
print(output)


if __name__ == '__main__':
unittest.main()
Loading