diff --git a/pypdf/_utils.py b/pypdf/_utils.py index 871e8b825..666facd2c 100644 --- a/pypdf/_utils.py +++ b/pypdf/_utils.py @@ -457,6 +457,21 @@ def logger_warning(msg: str, src: str) -> None: logging.getLogger(src).warning(msg) +def logger_debug(msg: str, src: str) -> None: + """ + Use this instead of logger.debug directly. + + That allows people to overwrite it more easily. + + ## Exception, warnings.warn, logger_warning, logger_debug + - Exceptions → cuando el usuario debe manejar el error (PDF roto). + - warnings.warn → cuando el usuario debe corregir su código (ej: DeprecationWarnings). + - logger_warning → cuando pypdf maneja un problema pero sigue funcionando (modo strict=False). + - logger_debug → para información de depuración detallada, útil para desarrolladores. + """ + logging.getLogger(src).debug(msg) + + def rename_kwargs( func_name: str, kwargs: dict[str, Any], aliases: dict[str, str], fail: bool = False ) -> None: diff --git a/pypdf/_writer.py b/pypdf/_writer.py index a2209c8f4..b0b93db43 100644 --- a/pypdf/_writer.py +++ b/pypdf/_writer.py @@ -40,14 +40,13 @@ from re import Pattern from types import TracebackType from typing import ( - IO, Any, Callable, + cast, + IO, Optional, Union, - cast, ) - from ._cmap import _default_fonts_space_width, build_char_map_from_dict from ._doc_common import DocumentInformation, PdfDocCommon from ._encryption import EncryptAlgorithm, Encryption @@ -59,6 +58,7 @@ StreamType, _get_max_pdf_version_header, deprecation_no_replacement, + logger_debug, logger_warning, ) from .constants import AnnotationDictionaryAttributes as AA @@ -80,7 +80,6 @@ from .constants import TrailerKeys as TK from .errors import PyPdfError from .generic import ( - PAGE_FIT, ArrayObject, BooleanObject, ByteStringObject, @@ -95,6 +94,7 @@ NameObject, NullObject, NumberObject, + PAGE_FIT, PdfObject, RectangleObject, ReferenceLink, @@ -1679,75 +1679,214 @@ def compress_identical_objects( remove_orphans: bool = True, ) -> None: """ - Parse the PDF file and merge objects that have the same hash. - This will make objects common to multiple pages. - Recommended to be used just before writing output. + Compress identical objects in the PDF file. Args: remove_identicals: Remove identical objects. remove_orphans: Remove unreferenced objects. + """ + if not remove_identicals and not remove_orphans: + logger_warning( + "Remove identical objects and remove unreferenced objects are both disabled. Nothing has been done.", + __name__, + ) + return + + orphans: Optional[list[bool]] = None + if remove_identicals: + # When remove_orphans=True, also mark reachability while rewriting refs + orphans = self._remove_identical_objects(mark_orphans=remove_orphans) + + if remove_orphans: + if orphans is None: + # recursive DFS (mark phase) + orphans = self._perform_orphan_recursive_mark_phase() + # sweep phase + assert orphans is not None + self._perform_orphan_sweep_phase(orphans) + + + def _remove_identical_objects(self, mark_orphans: bool) -> Optional[list[bool]]: """ + Detect identical objects by hash and collapse duplicates. + If mark_orphans is True, mark reachable objects during the rewrite so that the caller + can later sweep truly unreachable ones. + + This function: + - groups objects by obj.hash_value(), --- WARNING: this can cause collisions !!! + - keeps the first occurrence as the representative, + - for each duplicate sets self._objects[idx] = None (we do not insert an IndirectObject here), + - updates references inside *live* objects so that any IndirectObject that previously + pointed to a duplicate will point to the representative. + + IMPORTANT: This function does NOT remove unreachable objects (orphans). + Removing unreachable objects is done by the orphan path (mark & sweep) outside. + + Args: + mark_orphans: if True, mark reachability while rewriting references. - def replace_in_obj( - obj: PdfObject, crossref: dict[IndirectObject, IndirectObject] + Returns: + Optional[list[bool]]: a boolean list the size of self._objects where True means + "unvisited (potential orphan)" and False means "reachable". Returns None if + mark_orphans is False. + """ + # ----- + def replace_indirect_object( + obj: PdfObject, + crossref: dict[IndirectObject, IndirectObject], + mark_orphans: bool, ) -> None: - if isinstance(obj, DictionaryObject): - key_val = obj.items() + """ + Replace an indirect object with its representative. + If mark_orphans is True, mark reachable objects during the rewrite so that the caller + can later sweep truly unreachable ones. + + --- SECURITY CONCERN !!! + This function is recursive. A maliciusly crafted PDF file could cause an infinite loop + """ + if isinstance(obj, (DictionaryObject, StreamObject)): + key_value_pairs = obj.items() elif isinstance(obj, ArrayObject): - key_val = enumerate(obj) # type: ignore - else: + key_value_pairs = enumerate(obj) + else: # primitives return - assert isinstance(obj, (DictionaryObject, ArrayObject)) - for k, v in key_val: - if isinstance(v, IndirectObject): - orphans[v.idnum - 1] = False - if v in crossref: - obj[k] = crossref[v] + + # Traverse nested containers + for key, value in key_value_pairs: + if isinstance(value, IndirectObject): + if mark_orphans and orphans is not None: + # idnum is 1-based; list is 0-based + orphans[value.idnum - 1] = False + if value in crossref: + obj[key] = crossref[value] else: - """the filtering on DictionaryObject and ArrayObject only - will be performed within replace_in_obj""" - replace_in_obj(v, crossref) - - # _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...]) - self._idnum_hash = {} - orphans = [True] * len(self._objects) - # look for similar objects - for idx, obj in enumerate(self._objects): + replace_indirect_object(value, crossref, mark_orphans) + # ----- + + # Map: hash_value -> (representative_indirect, [duplicate_indirects...]) + self._idnum_hash: dict[int, tuple[IndirectObject, list[IndirectObject]]] = {} + objects = list(self._objects) # save copy + orphans: Optional[list[bool]] = [True] * len(objects) if mark_orphans else None + + # PHASE 1: Group objects by hash and blank out duplicates + for idx, obj in enumerate(objects): if is_null_or_none(obj): continue - assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here. + assert obj is not None assert isinstance(obj.indirect_reference, IndirectObject) - h = obj.hash_value() - if remove_identicals and h in self._idnum_hash: - self._idnum_hash[h][1].append(obj.indirect_reference) + hsh = obj.hash_value() + if hsh in self._idnum_hash: + # duplicate: record its indirect and clear the slot + self._idnum_hash[hsh][1].append(obj.indirect_reference) self._objects[idx] = None + """ + Here we are mutating self._objects in-place, while iterating at the + same time. If multi-threading is added in the future, this could + cause a race condition. + """ else: - self._idnum_hash[h] = (obj.indirect_reference, []) + self._idnum_hash[hsh] = (obj.indirect_reference, []) - # generate the dict converting others to 1st - cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0} + # PHASE 2: Build reverse conversion map {dup_indirect -> rep_indirect} cnv_rev: dict[IndirectObject, IndirectObject] = {} - for k, v in cnv.items(): - cnv_rev.update(zip(v, (k,) * len(v))) + for rep, dups in self._idnum_hash.values(): + for dup in dups: + cnv_rev[dup] = rep - # replace reference to merged objects + # PHASE 3: Rewrite references inside remaining live container objects for obj in self._objects: - if isinstance(obj, (DictionaryObject, ArrayObject)): - replace_in_obj(obj, cnv_rev) + if isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)): + replace_indirect_object(obj, cnv_rev, mark_orphans) - # remove orphans (if applicable) - orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore + if mark_orphans: + orphans[self.root_object.indirect_reference.idnum - 1] = False + orphans[self._info.indirect_reference.idnum - 1] = False + + try: + orphans[self._ID.indirect_reference.idnum - 1] = False + except AttributeError: + logger_debug( + f"AttributeError: {__name__} has no attribute '_ID'.", + __name__ + ) + return orphans - orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore + def _perform_orphan_recursive_mark_phase( + self + ) -> list[bool]: + """ + Mark phase: mark reachable objects during the rewrite so that the caller + can later sweep truly unreachable ones. + Returns: + list[bool]: a boolean list the size of self._objects where True means + "unvisited (potential orphan)" and False means "reachable". Returns None if + mark_orphans is False. + """ + # ----- + def mark_orphans( + obj: PdfObject + )-> list[bool]: + """ + Mark reachable objects during the rewrite so that the caller + can later sweep truly unreachable ones. + + --- SECURITY CONCERN !!! + This function is recursive. A maliciusly crafted PDF file could cause an infinite loop + """ + if isinstance(obj, (DictionaryObject, StreamObject)): + key_value_pairs = obj.items() + elif isinstance(obj, ArrayObject): + key_value_pairs = enumerate(obj) + else: # ignore primitives + return + assert isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)) + + for key, value in key_value_pairs: + if isinstance(value, IndirectObject): + orphans[value.idnum - 1] = False + else: + mark_orphans(value) + # ----- + objects = list(self._objects) + orphans = [True] * len(objects) + + for obj in self._objects: + if isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)): + mark_orphans(obj) + + orphans[self.root_object.indirect_reference.idnum - 1] = False + orphans[self._info.indirect_reference.idnum - 1] = False + try: - orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore + orphans[self._ID.indirect_reference.idnum - 1] = False except AttributeError: - pass + logger_debug( + f"AttributeError: {__name__} has no attribute '_ID'.", + __name__ + ) + return orphans + + def _perform_orphan_sweep_phase( + self, + orphans: list[bool], + ) -> None: + """ + Perform orphan sweep phase removing the unreachable objects. + + Args: + orphans (list[bool]): a boolean list the size of self._objects where True means + "unvisited (potential orphan)" and False means "reachable". + """ for i in compress(range(len(self._objects)), orphans): self._objects[i] = None + + + + + def get_reference(self, obj: PdfObject) -> IndirectObject: idnum = self._objects.index(obj) + 1 ref = IndirectObject(idnum, 0, self) diff --git a/tests/test_writer.py b/tests/test_writer.py index a0b9b3453..583d59bcf 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -2485,6 +2485,75 @@ def test_compress_identical_objects(): assert len(out2.getvalue()) > len(out3.getvalue()) +@pytest.mark.enable_socket +def test_compress_identical_objects_add_and_remove_page(): + """ + This test is a regression test for issue #3418. + + compress_identical_objects restores the PDF size to approximately the single-page + size after removing a duplicated page. + """ + SAMPLE_URLS = [ + "https://raw.githubusercontent.com/py-pdf/sample-files/main/001-trivial/minimal-document.pdf", + "https://raw.githubusercontent.com/py-pdf/sample-files/main/011-google-doc-document/google-doc-document.pdf", + ] + + for url in SAMPLE_URLS: + name = url.split("/")[-1] + in_bytes = BytesIO(get_data_from_url(url, name=name)) + + reader = PdfReader(in_bytes) + + # baseline writer with 1 copy (single-page PDF) + writer_single = PdfWriter() + writer_single.append(reader) + out_single = BytesIO() + writer_single.write(out_single) + single_len = len(out_single.getvalue()) + + # writer with 2 copies (duplicated pages) + writer_double = PdfWriter() + writer_double.append(reader) + writer_double.append(reader) + out_double = BytesIO() + writer_double.write(out_double) + double_len = len(out_double.getvalue()) + + # remove duplicated page and compress + try: + writer_double.remove_page(len(writer_double.pages) - 1) + except TypeError: + writer_double.remove_page(len(writer_double.pages) - 1, True) + + writer_double.compress_identical_objects(remove_orphans=True) + + out_after = BytesIO() + writer_double.write(out_after) + after_len = len(out_after.getvalue()) + + # tolerance: 3% of single_len or 2000 bytes, whichever is greater + tol = max(int(0.03 * single_len), 2000) + + assert abs(after_len - single_len) <= tol, ( + f"[{url}] expected size ≈ {single_len} ± {tol} bytes, got {after_len} " + f"(1p={single_len}, 2p={double_len})" + ) + + +def test_compress_identical_objects_noop(caplog): + writer = PdfWriter() + writer.add_blank_page(width=72, height=72) + + # Ejecutamos con ambos flags en False + with caplog.at_level("WARNING"): + writer.compress_identical_objects(remove_identicals=False, remove_orphans=False) + + # Verificamos que el warning se haya emitido + assert "Nothing has been done" in caplog.text + + # Verificamos que el documento sigue intacto (sigue teniendo 1 página) + assert len(writer.pages) == 1 + def test_set_need_appearances_writer(): """Minimal test for coverage""" writer = PdfWriter()