Skip to content

Commit 4bd021c

Browse files
committed
fix: resolve memory leaks in result set parsing
1 parent cc5f909 commit 4bd021c

File tree

5 files changed

+54
-43
lines changed

5 files changed

+54
-43
lines changed

mariadb/impl/client/async_client.py

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,12 @@ async def _parse_result_set(self, packet: PacketBuffer, config: 'Configuration',
505505
if not self.context.isEofDeprecated():
506506
# skip intermediate EOF packet
507507
(await self.read_stream.read_payload()).release()
508+
# Build decoder list once for all rows (performance optimization)
509+
decoders = self._build_decoder_list(columns, is_binary)
508510

511+
# Select appropriate row parser based on protocol
512+
row_parser = self._parse_binary_row_data if is_binary else self._parse_text_row_data
513+
509514
# Step 4: If unbuffered, create streaming result
510515
if not buffered:
511516
from ..result import AsyncStreamingResult
@@ -516,7 +521,8 @@ async def _parse_result_set(self, packet: PacketBuffer, config: 'Configuration',
516521
column_count=column_count,
517522
config=config,
518523
is_binary=is_binary,
519-
row_parser=self._parse_row_data # Pass row parser function
524+
row_parser=row_parser, # Pass appropriate row parser function
525+
decoders=decoders # Pass pre-built decoder list
520526
)
521527

522528
# Create completion with streaming result
@@ -527,47 +533,50 @@ async def _parse_result_set(self, packet: PacketBuffer, config: 'Configuration',
527533
# Step 5: Read row data packets until EOF (buffered mode)
528534
rows = []
529535
parser = PayloadParser(None)
530-
while True:
531-
row_packet = await self.read_stream.read_payload()
532-
# Check for EOF/OK packet based on DEPRECATE_EOF capability and packet length
533-
# EOF/OK packets start with 0xFE and have specific length constraints
534-
if (row_packet[0] == self.EOF_PACKET and
535-
((self.context.isEofDeprecated() and len(row_packet) < 16777215) or
536-
(not self.context.isEofDeprecated() and len(row_packet) < 8))):
537-
538-
if not self.context.isEofDeprecated():
539-
# Traditional EOF packet
540-
completion = EofPacket.decode(row_packet, self.context)
541-
else:
542-
# OK packet with 0xFE header (DEPRECATE_EOF enabled) - use existing OK packet parser
543-
completion = OkPacket.decode(row_packet, self.context)
544-
545-
# Apply converters to all rows at once
546-
rows = self._apply_converters_to_rows(rows, columns, config)
536+
try:
537+
while True:
538+
row_packet = await self.read_stream.read_payload()
539+
# Check for EOF/OK packet based on DEPRECATE_EOF capability and packet length
540+
# EOF/OK packets start with 0xFE and have specific length constraints
541+
if (row_packet[0] == self.EOF_PACKET and
542+
((self.context.isEofDeprecated() and len(row_packet) < 16777215) or
543+
(not self.context.isEofDeprecated() and len(row_packet) < 8))):
544+
545+
if not self.context.isEofDeprecated():
546+
# Traditional EOF packet
547+
completion = EofPacket.decode(row_packet, self.context)
548+
else:
549+
# OK packet with 0xFE header (DEPRECATE_EOF enabled) - use existing OK packet parser
550+
completion = OkPacket.decode(row_packet, self.context)
551+
552+
# Apply converters to all rows at once
553+
rows = self._apply_converters_to_rows(rows, columns, config)
547554

548-
# Create AsyncCompleteResult with all rows
549-
from ..result import AsyncCompleteResult
550-
complete_result = AsyncCompleteResult(
551-
columns=columns,
552-
column_count=column_count,
553-
config=config,
554-
rows=rows,
555-
is_binary=is_binary
556-
)
557-
558-
# Store result object in completion
559-
completion.result_set = complete_result
560-
561-
return completion
562-
elif row_packet[0] == self.ERROR_PACKET:
563-
raise ErrorPacket.decode(row_packet, self.context).toError(self.exception_factory)
555+
# Create AsyncCompleteResult with all rows
556+
from ..result import AsyncCompleteResult
557+
complete_result = AsyncCompleteResult(
558+
columns=columns,
559+
column_count=column_count,
560+
config=config,
561+
rows=rows,
562+
is_binary=is_binary
563+
)
564+
565+
# Store result object in completion
566+
completion.result_set = complete_result
567+
568+
return completion
569+
elif row_packet[0] == self.ERROR_PACKET:
570+
raise ErrorPacket.decode(row_packet, self.context).toError(self.exception_factory)
564571

565-
else:
566-
# Row data packet
567-
parser.set_buffer(row_packet)
568-
row_data = self._parse_row_data(parser, columns, config, is_binary)
569-
rows.append(row_data)
570-
572+
else:
573+
# Row data packet
574+
# Row data packet - use pre-built decoders
575+
parser.set_buffer(row_packet)
576+
rows.append(row_parser(parser, columns, config, decoders))
577+
finally:
578+
if parser and parser.packet:
579+
parser.packet.release()
571580

572581
except Exception as e:
573582
raise OperationalError(f"Failed to parse result set: {e}")

mariadb/impl/client/base_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ def _parse_text_row_data(self, parser: PayloadParser, columns: List[ColumnDefini
569569
for i, decoder in enumerate(decoders):
570570
value = decoder(parser, columns[i], config)
571571
row_values[i] = value
572+
parser.packet.release()
572573
return tuple(row_values)
573574

574575
def _parse_binary_row_data(self, parser: PayloadParser, columns: List[ColumnDefinitionPacket], config: 'Configuration', decoders: List[Callable]) -> tuple:
@@ -590,6 +591,7 @@ def _parse_binary_row_data(self, parser: PayloadParser, columns: List[ColumnDefi
590591
# Use pre-built decoder for non-NULL values
591592
value = decoder(parser, columns[i], config)
592593
row_values[i] = value
594+
parser.packet.release()
593595
return tuple(row_values)
594596

595597

mariadb/impl/client/socket/payload_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def read_null_terminated_string(self, encoding: str = 'utf-8') -> str:
152152
if self.packet[i] == 0x00:
153153
string_data = bytes(self.packet[self.pos:i]).decode(encoding)
154154
self.pos = i + 1
155-
return string_data
155+
return string_data
156156
string_data = bytes(self.packet[self.pos:]).decode(encoding)
157157
self.pos = len(self.packet)
158158
return string_data

mariadb/impl/client/socket/read_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __len__(self):
4747

4848
def release(self) -> None:
4949
"""Release buffer back to stream (only needed for views)"""
50-
if self._is_view:
50+
if self._is_view and self._stream:
5151
self._stream._release_buffer()
5252
self._stream = None
5353

mariadb/impl/client/sync_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ def _parse_result_set(self, packet: PacketBuffer, config: 'Configuration', is_bi
481481
parser.set_buffer(row_packet)
482482
rows.append(row_parser(parser, columns, config, decoders))
483483
finally:
484-
if parser.packet:
484+
if parser and parser.packet:
485485
parser.packet.release()
486486

487487

0 commit comments

Comments
 (0)