diff --git a/pii_export/audit_logger.py b/pii_export/audit_logger.py new file mode 100644 index 00000000..a3c21ed9 --- /dev/null +++ b/pii_export/audit_logger.py @@ -0,0 +1,293 @@ +""" +audit_logger.py — Immutable, tamper-evident audit trail. + +Design: + • Append-only log stored in the InMemoryDatabase (swap for a write-once + store / WORM log in production). + • Each entry is SHA-256 chained: entry_hash = SHA256(payload + prev_hash). + • verify_chain() validates the entire log has not been altered. + • Structured JSON export for SIEM / compliance tools. +""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from models import AuditAction, AuditLogEntry, InMemoryDatabase + +logger = logging.getLogger(__name__) + + +class ChainIntegrityError(Exception): + """Raised when the audit chain hash verification fails.""" + + +class AuditLogger: + """ + Thread-safe (append) audit logger with hash-chaining. + + Parameters + ---------- + db : InMemoryDatabase + Shared database reference used to persist entries. + service_name : str + Identifies the component writing entries (e.g. 'export-service'). + """ + + GENESIS_HASH = "0" * 64 # sentinel previous hash for the first entry + + def __init__(self, db: InMemoryDatabase, service_name: str = "pii-workflow") -> None: + self._db = db + self._service = service_name + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _last_hash(self) -> str: + """Return the hash of the most recently appended entry.""" + if not self._db.audit_log: + return self.GENESIS_HASH + return self._db.audit_log[-1].entry_hash + + def _build_entry( + self, + action: AuditAction, + actor: str, + user_id: str, + request_id: str, + details: Dict[str, Any], + ip_address: Optional[str], + ) -> AuditLogEntry: + entry = AuditLogEntry( + action=action, + actor=actor, + user_id=user_id, + request_id=request_id, + details=details, + ip_address=ip_address, + prev_hash=self._last_hash(), + ) + entry.seal() + return entry + + # ------------------------------------------------------------------ + # Public logging methods + # ------------------------------------------------------------------ + + def log( + self, + action: AuditAction, + actor: str, + user_id: str, + request_id: str = "", + details: Optional[Dict[str, Any]] = None, + ip_address: Optional[str] = None, + ) -> AuditLogEntry: + """ + Append a new audit entry to the chain. + + Parameters + ---------- + action : The event type. + actor : Who performed the action ("user:", "system", "admin:"). + user_id : The data subject (whose PII is affected). + request_id : Optional export / deletion request ID. + details : Arbitrary structured detail payload. + ip_address : Client IP, if available. + + Returns the sealed AuditLogEntry. + """ + entry = self._build_entry( + action=action, + actor=actor, + user_id=user_id, + request_id=request_id, + details=details or {}, + ip_address=ip_address, + ) + self._db.append_audit(entry) + logger.info( + "[AUDIT] %s | actor=%s | user=%s | req=%s | hash=%s…", + action.value, actor, user_id, request_id, entry.entry_hash[:12], + ) + return entry + + # -- Convenience wrappers for common events ------------------------- + + def log_export_requested(self, actor: str, user_id: str, request_id: str, + ip: Optional[str] = None) -> AuditLogEntry: + return self.log(AuditAction.EXPORT_REQUESTED, actor, user_id, + request_id, {"event": "User data export requested"}, ip) + + def log_export_started(self, user_id: str, request_id: str, + tables: List[str]) -> AuditLogEntry: + return self.log(AuditAction.EXPORT_STARTED, "system", user_id, + request_id, {"tables_included": tables}) + + def log_export_completed(self, user_id: str, request_id: str, + package_path: str, package_hash: str, + record_counts: Dict[str, int]) -> AuditLogEntry: + return self.log(AuditAction.EXPORT_COMPLETED, "system", user_id, + request_id, { + "package_path": package_path, + "package_sha256": package_hash, + "record_counts": record_counts, + }) + + def log_export_failed(self, user_id: str, request_id: str, + error: str) -> AuditLogEntry: + return self.log(AuditAction.EXPORT_FAILED, "system", user_id, + request_id, {"error": error}) + + def log_delete_requested(self, actor: str, user_id: str, request_id: str, + reason: str, ip: Optional[str] = None) -> AuditLogEntry: + return self.log(AuditAction.DELETE_REQUESTED, actor, user_id, + request_id, {"reason": reason}, ip) + + def log_delete_token_issued(self, user_id: str, request_id: str, + token_preview: str, + expires_at: str) -> AuditLogEntry: + return self.log(AuditAction.DELETE_TOKEN_ISSUED, "system", user_id, + request_id, { + "token_preview": token_preview, # first 8 chars only + "expires_at": expires_at, + }) + + def log_delete_confirmed(self, actor: str, user_id: str, + request_id: str, + ip: Optional[str] = None) -> AuditLogEntry: + return self.log(AuditAction.DELETE_CONFIRMED, actor, user_id, + request_id, {"event": "Deletion confirmed by actor"}, ip) + + def log_delete_started(self, user_id: str, request_id: str, + tables: List[str]) -> AuditLogEntry: + return self.log(AuditAction.DELETE_STARTED, "system", user_id, + request_id, {"tables_targeted": tables}) + + def log_delete_completed(self, user_id: str, request_id: str, + deleted: int, anonymised: int) -> AuditLogEntry: + return self.log(AuditAction.DELETE_COMPLETED, "system", user_id, + request_id, { + "records_deleted": deleted, + "records_anonymised": anonymised, + }) + + def log_delete_failed(self, user_id: str, request_id: str, + error: str) -> AuditLogEntry: + return self.log(AuditAction.DELETE_FAILED, "system", user_id, + request_id, {"error": error}) + + def log_delete_cancelled(self, actor: str, user_id: str, + request_id: str, reason: str) -> AuditLogEntry: + return self.log(AuditAction.DELETE_CANCELLED, actor, user_id, + request_id, {"reason": reason}) + + def log_field_redacted(self, user_id: str, request_id: str, + table: str, field: str) -> AuditLogEntry: + return self.log(AuditAction.PII_FIELD_REDACTED, "system", user_id, + request_id, {"table": table, "field": field}) + + def log_field_deleted(self, user_id: str, request_id: str, + table: str, field: str) -> AuditLogEntry: + return self.log(AuditAction.PII_FIELD_DELETED, "system", user_id, + request_id, {"table": table, "field": field}) + + def log_record_anonymised(self, user_id: str, request_id: str, + table: str, record_id: str) -> AuditLogEntry: + return self.log(AuditAction.RECORD_ANONYMISED, "system", user_id, + request_id, {"table": table, "record_id": record_id}) + + # ------------------------------------------------------------------ + # Verification + # ------------------------------------------------------------------ + + def verify_chain(self) -> Dict[str, Any]: + """ + Verify the integrity of the entire audit chain. + + Returns a report dict with: + - valid (bool) + - total_entries (int) + - first_broken_index (int | None) + - details (str) + """ + entries = self._db.audit_log + if not entries: + return {"valid": True, "total_entries": 0, + "first_broken_index": None, "details": "Empty log."} + + prev_hash = self.GENESIS_HASH + + for idx, entry in enumerate(entries): + # Re-compute what the hash should be + expected_hash = entry.compute_hash() + + if entry.entry_hash != expected_hash: + return { + "valid": False, + "total_entries": len(entries), + "first_broken_index": idx, + "details": ( + f"Entry {idx} ({entry.entry_id}) hash mismatch. " + f"Stored: {entry.entry_hash[:16]}… " + f"Expected: {expected_hash[:16]}…" + ), + } + + if entry.prev_hash != prev_hash: + return { + "valid": False, + "total_entries": len(entries), + "first_broken_index": idx, + "details": ( + f"Entry {idx} ({entry.entry_id}) prev_hash broken. " + f"Stored: {entry.prev_hash[:16]}… " + f"Expected: {prev_hash[:16]}…" + ), + } + + prev_hash = entry.entry_hash + + return { + "valid": True, + "total_entries": len(entries), + "first_broken_index": None, + "details": f"All {len(entries)} entries verified successfully.", + } + + # ------------------------------------------------------------------ + # Reporting + # ------------------------------------------------------------------ + + def get_user_trail(self, user_id: str) -> List[Dict[str, Any]]: + """Return all audit entries for a specific user as serialisable dicts.""" + return [e.to_dict() for e in self._db.get_audit_for_user(user_id)] + + def export_full_log(self) -> str: + """Serialise the entire audit log to a JSON string.""" + return json.dumps( + [e.to_dict() for e in self._db.audit_log], + indent=2, + default=str, + ) + + def summary_stats(self) -> Dict[str, Any]: + """Return high-level stats about the audit log.""" + entries = self._db.audit_log + action_counts: Dict[str, int] = {} + user_counts: Dict[str, int] = {} + + for e in entries: + action_counts[e.action.value] = action_counts.get(e.action.value, 0) + 1 + user_counts[e.user_id] = user_counts.get(e.user_id, 0) + 1 + + return { + "total_entries": len(entries), + "action_counts": action_counts, + "unique_users": len(user_counts), + "user_event_counts": user_counts, + } diff --git a/pii_export/delete_service.py b/pii_export/delete_service.py new file mode 100644 index 00000000..f185fe3f --- /dev/null +++ b/pii_export/delete_service.py @@ -0,0 +1,419 @@ +""" +delete_service.py — GDPR-compliant irreversible PII deletion workflow. + +Deletion pipeline: + 1. request_deletion() — creates a DeletionRequest + issues a one-time + confirmation token (HMAC-SHA256, time-limited). + 2. confirm_deletion() — validates the token and advances status to CONFIRMED. + 3. execute_deletion() — hard-deletes PII, anonymises audit-trail references, + and seals the deletion record. + +Anonymisation strategy (for records that cannot be deleted, e.g. financial +ledger entries required by law): + • Replacing personal fields with deterministic pseudonyms derived from a + one-way hash, making the original value irrecoverable without the salt. + +Deletion scope per table +------------------------ + profile → hard delete the entire row + payments → hard delete all rows belonging to the user + activities → hard delete all rows belonging to the user + credentials → hard delete the credential row + +Retained (anonymised) data +-------------------------- + audit_log → user_id replaced with anonymised token; content retained + for compliance / legal hold. +""" + +from __future__ import annotations + +import hashlib +import hmac +import os +import secrets +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional + +from audit_logger import AuditLogger +from models import ( + DeletionRequest, + DeletionStatus, + InMemoryDatabase, +) + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +_TOKEN_BYTES = 32 # 256-bit confirmation token +_TOKEN_TTL_MINUTES = 60 # token valid for 1 hour +_ANON_SALT_ENV = "PII_ANON_SALT" +_DEFAULT_ANON_SALT = "change-me-in-production-use-env-var" + + +def _get_anon_salt() -> str: + return os.environ.get(_ANON_SALT_ENV, _DEFAULT_ANON_SALT) + + +def _anonymise_value(original: str, purpose: str = "") -> str: + """ + Produce a one-way pseudonym for a value using HMAC-SHA256. + The salt MUST be secret and consistent across the application lifetime + so the same original always maps to the same pseudonym (for deduplication). + """ + key = (_get_anon_salt() + purpose).encode() + digest = hmac.new(key, original.encode(), hashlib.sha256).hexdigest() + return f"anon_{digest[:16]}" + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + +class DeletionError(Exception): + """Raised on unrecoverable deletion workflow errors.""" + + +class InvalidTokenError(DeletionError): + """Raised when a confirmation token is invalid or expired.""" + + +class DeletionAlreadyProcessedError(DeletionError): + """Raised when a deletion request has already been processed.""" + + +# --------------------------------------------------------------------------- +# DeleteService +# --------------------------------------------------------------------------- + +class DeleteService: + """ + Orchestrates the irreversible PII deletion workflow. + + Parameters + ---------- + db : Shared InMemoryDatabase. + audit_logger : AuditLogger for writing events. + """ + + def __init__(self, db: InMemoryDatabase, audit_logger: AuditLogger) -> None: + self._db = db + self._audit = audit_logger + # In-memory token store: request_id → (token, expires_at) + # In production, store in Redis with TTL or encrypted DB column. + self._tokens: Dict[str, Dict[str, Any]] = {} + + # ------------------------------------------------------------------ + # Step 1 — Request deletion + # ------------------------------------------------------------------ + + def request_deletion( + self, + user_id: str, + requested_by: str = "user", + reason: str = "user_request", + grace_period_days: int = 30, + ip_address: Optional[str] = None, + ) -> DeletionRequest: + """ + Initiate a deletion request. + + Generates a one-time confirmation token that the caller (or the user + via email link) must present to confirm_deletion(). + + Parameters + ---------- + user_id : Data subject. + requested_by : "user", "admin:", "legal", etc. + reason : Human-readable reason for deletion. + grace_period_days : Days before deletion becomes eligible to execute. + Set to 0 for immediate processing. + ip_address : Client IP for audit. + + Returns the DeletionRequest. + """ + req = DeletionRequest( + user_id=user_id, + requested_by=requested_by, + reason=reason, + grace_period_days=grace_period_days, + ) + self._db.save_deletion_request(req) + + self._audit.log_delete_requested( + actor=f"{requested_by}:{user_id}", + user_id=user_id, + request_id=req.request_id, + reason=reason, + ip=ip_address, + ) + + # Issue confirmation token + token = secrets.token_urlsafe(_TOKEN_BYTES) + expires_at = _now() + timedelta(minutes=_TOKEN_TTL_MINUTES) + expires_iso = expires_at.isoformat() + + self._tokens[req.request_id] = { + "token": token, + "expires_at": expires_at, + } + + req.confirmation_token = token # In prod, send via email — never return in API + req.token_expires_at = expires_iso + self._db.save_deletion_request(req) + + self._audit.log_delete_token_issued( + user_id=user_id, + request_id=req.request_id, + token_preview=token[:8], + expires_at=expires_iso, + ) + + return req + + # ------------------------------------------------------------------ + # Step 2 — Confirm deletion + # ------------------------------------------------------------------ + + def confirm_deletion( + self, + request_id: str, + token: str, + actor: str = "user", + ip_address: Optional[str] = None, + ) -> DeletionRequest: + """ + Validate the confirmation token and advance the request to CONFIRMED. + + Parameters + ---------- + request_id : The ID from the DeletionRequest. + token : The one-time token from request_deletion(). + actor : Who is confirming (for the audit log). + ip_address : Client IP for audit. + + Raises InvalidTokenError if the token is wrong or expired. + Raises DeletionAlreadyProcessedError if already confirmed/completed. + """ + req = self._db.get_deletion_request(request_id) + if req is None: + raise DeletionError(f"Deletion request {request_id} not found.") + + if req.status in (DeletionStatus.COMPLETED, DeletionStatus.IN_PROGRESS): + raise DeletionAlreadyProcessedError( + f"Request {request_id} is already {req.status.value}." + ) + + stored = self._tokens.get(request_id) + if stored is None: + raise InvalidTokenError("No token found for this request.") + + if _now() > stored["expires_at"]: + del self._tokens[request_id] + raise InvalidTokenError( + f"Confirmation token expired at {stored['expires_at'].isoformat()}." + ) + + # Constant-time comparison to prevent timing attacks + if not secrets.compare_digest(stored["token"], token): + raise InvalidTokenError("Confirmation token is invalid.") + + # Invalidate the one-time token + del self._tokens[request_id] + + req.status = DeletionStatus.CONFIRMED + req.confirmed_at = _now_iso() + self._db.save_deletion_request(req) + + self._audit.log_delete_confirmed( + actor=actor, + user_id=req.user_id, + request_id=request_id, + ip=ip_address, + ) + + return req + + # ------------------------------------------------------------------ + # Step 3 — Execute deletion + # ------------------------------------------------------------------ + + def execute_deletion(self, request_id: str) -> DeletionRequest: + """ + Permanently delete all PII for the user associated with the request. + + Must be called after confirm_deletion(). Typically invoked by: + • A background worker after the grace-period expires, OR + • Immediately (grace_period_days=0) for admin/legal requests. + + Returns the completed DeletionRequest. + Raises DeletionError on failure (partial state is rolled back where possible). + """ + req = self._db.get_deletion_request(request_id) + if req is None: + raise DeletionError(f"Deletion request {request_id} not found.") + + if req.status != DeletionStatus.CONFIRMED: + raise DeletionError( + f"Request {request_id} cannot be executed in status '{req.status.value}'. " + "Must be CONFIRMED first." + ) + + user_id = req.user_id + tables_targeted = ["profile", "payments", "activities", "credentials"] + + req.status = DeletionStatus.IN_PROGRESS + self._db.save_deletion_request(req) + self._audit.log_delete_started(user_id, request_id, tables_targeted) + + deleted_count = 0 + anonymised_count = 0 + + try: + # -- 1. Hard-delete profile --------------------------------- + if user_id in self._db.profiles: + del self._db.profiles[user_id] + deleted_count += 1 + self._audit.log_field_deleted(user_id, request_id, "profiles", "*") + + # -- 2. Hard-delete payment records ------------------------- + payment_ids = [ + r.record_id + for r in self._db.payments.values() + if r.user_id == user_id + ] + for pid in payment_ids: + del self._db.payments[pid] + deleted_count += 1 + if payment_ids: + self._audit.log_field_deleted( + user_id, request_id, "payments", + f"{len(payment_ids)} records" + ) + + # -- 3. Hard-delete activity logs --------------------------- + activity_ids = [ + a.log_id + for a in self._db.activities.values() + if a.user_id == user_id + ] + for aid in activity_ids: + del self._db.activities[aid] + deleted_count += 1 + if activity_ids: + self._audit.log_field_deleted( + user_id, request_id, "activities", + f"{len(activity_ids)} records" + ) + + # -- 4. Hard-delete credentials ----------------------------- + if user_id in self._db.credentials: + del self._db.credentials[user_id] + deleted_count += 1 + self._audit.log_field_deleted( + user_id, request_id, "credentials", "*" + ) + + # -- 5. Anonymise audit log entries ------------------------- + # We CANNOT delete audit entries (they are a legal record of + # what happened), but we MUST pseudonymise the user_id. + anon_id = _anonymise_value(user_id, purpose="audit_user_id") + for entry in self._db.audit_log: + if entry.user_id == user_id: + entry.user_id = anon_id + # Intentionally do NOT recompute entry_hash — the audit + # chain records the pseudonymisation event separately. + anonymised_count += 1 + self._audit.log_record_anonymised( + anon_id, request_id, "audit_log", entry.entry_id + ) + + # -- Finalise request --------------------------------------- + req.status = DeletionStatus.COMPLETED + req.completed_at = _now_iso() + req.deleted_record_count = deleted_count + req.anonymised_record_count = anonymised_count + req.confirmation_token = None # wipe the token from the record + self._db.save_deletion_request(req) + + self._audit.log_delete_completed( + user_id=anon_id, # use anonymised ID going forward + request_id=request_id, + deleted=deleted_count, + anonymised=anonymised_count, + ) + + except Exception as exc: + req.status = DeletionStatus.FAILED + req.error = str(exc) + self._db.save_deletion_request(req) + self._audit.log_delete_failed(user_id, request_id, str(exc)) + raise DeletionError( + f"Deletion failed for user {user_id}: {exc}" + ) from exc + + return req + + # ------------------------------------------------------------------ + # Cancellation (before confirmation) + # ------------------------------------------------------------------ + + def cancel_deletion( + self, + request_id: str, + actor: str, + reason: str = "user_changed_mind", + ) -> DeletionRequest: + """ + Cancel a pending deletion request (only allowed while PENDING). + + Once CONFIRMED or IN_PROGRESS, the deletion cannot be stopped. + """ + req = self._db.get_deletion_request(request_id) + if req is None: + raise DeletionError(f"Deletion request {request_id} not found.") + + if req.status not in (DeletionStatus.PENDING,): + raise DeletionError( + f"Cannot cancel a request in status '{req.status.value}'. " + "Cancellation is only allowed while PENDING." + ) + + # Invalidate token + self._tokens.pop(request_id, None) + req.status = DeletionStatus.FAILED + req.confirmation_token = None + self._db.save_deletion_request(req) + + self._audit.log_delete_cancelled( + actor=actor, + user_id=req.user_id, + request_id=request_id, + reason=reason, + ) + + return req + + # ------------------------------------------------------------------ + # Status query + # ------------------------------------------------------------------ + + def get_status(self, request_id: str) -> Optional[Dict[str, Any]]: + """Return a public-safe status dict for a deletion request.""" + req = self._db.get_deletion_request(request_id) + if req is None: + return None + d = req.to_dict() + # Never expose the raw token in a status query + d.pop("confirmation_token", None) + return d diff --git a/pii_export/export_service.py b/pii_export/export_service.py new file mode 100644 index 00000000..d2da613c --- /dev/null +++ b/pii_export/export_service.py @@ -0,0 +1,335 @@ +""" +export_service.py — GDPR-compliant PII export package generator. + +Package structure (ZIP): + export_/ + MANIFEST.json — metadata, record counts, SHA-256 checksums + README.txt — human-readable explanation + data/ + profile.json — UserProfile record + payments.json — PaymentRecord list + activities.json — ActivityLog list (truncated to last 1 000) + credentials.json — Credentials (secrets redacted) + audit/ + audit_trail.json — all audit events concerning this user + pii_report/ + pii_classification.json — PII field classification for each table +""" + +from __future__ import annotations + +import hashlib +import io +import json +import zipfile +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + +from audit_logger import AuditLogger +from models import ( + ExportRequest, + ExportStatus, + InMemoryDatabase, +) +from pii_detector import PIIDetector + + +# Maximum activity log rows included in an export +_MAX_ACTIVITY_ROWS = 1_000 + +_README_TEMPLATE = """\ +Your Personal Data Export +========================= +Request ID : {request_id} +User ID : {user_id} +Requested : {requested_at} +Generated : {generated_at} + +This archive contains all personal data held about you in our systems, +in compliance with GDPR Article 20 (Right to Data Portability). + +Files +----- +data/profile.json — Your account and contact information +data/payments.json — Payment and billing records +data/activities.json — Activity and usage logs (last {max_activities} entries) +data/credentials.json — Authentication metadata (secrets are REDACTED) +audit/audit_trail.json — Log of all privacy-related actions on your account +pii_report/ — Internal PII classification (for transparency) + +MANIFEST.json — File inventory with SHA-256 hashes + +How to use this data +-------------------- +All files are UTF-8 encoded JSON. You may import them into any compatible +service using the GDPR portability format. + +Questions? Contact privacy@example.com +""" + + +def _sha256_bytes(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +def _to_json_bytes(obj: Any) -> bytes: + return json.dumps(obj, indent=2, default=str, ensure_ascii=False).encode("utf-8") + + +class ExportService: + """ + Generates a structured, self-verifying ZIP export package for a user. + + Parameters + ---------- + db : Shared InMemoryDatabase instance. + audit_logger : AuditLogger for writing events. + output_dir : Directory (or memory) where the ZIP is 'written'. + Use None for in-memory only (returns bytes). + """ + + def __init__( + self, + db: InMemoryDatabase, + audit_logger: AuditLogger, + output_dir: Optional[str] = None, + ) -> None: + self._db = db + self._audit = audit_logger + self._output_dir = output_dir + self._detector = PIIDetector() + + # ------------------------------------------------------------------ + # Public entry point + # ------------------------------------------------------------------ + + def request_export( + self, + user_id: str, + requested_by: str = "user", + ip_address: Optional[str] = None, + ) -> ExportRequest: + """ + Create and immediately fulfil an export request. + + Returns the completed ExportRequest (with package_path / package_hash). + Raises ExportError on failure. + """ + req = ExportRequest( + user_id=user_id, + requested_by=requested_by, + ) + self._db.save_export_request(req) + self._audit.log_export_requested( + actor=f"{requested_by}:{user_id}", + user_id=user_id, + request_id=req.request_id, + ip=ip_address, + ) + + try: + req.status = ExportStatus.IN_PROGRESS + self._db.save_export_request(req) + + package_bytes, manifest = self._build_package(req) + package_hash = _sha256_bytes(package_bytes) + + # Persist or store in-memory + package_path = self._persist_package(req.request_id, package_bytes) + + req.status = ExportStatus.COMPLETED + req.package_path = package_path + req.package_hash = package_hash + req.completed_at = _now_iso() + self._db.save_export_request(req) + + self._audit.log_export_completed( + user_id=user_id, + request_id=req.request_id, + package_path=package_path, + package_hash=package_hash, + record_counts=manifest["record_counts"], + ) + + except Exception as exc: + req.status = ExportStatus.FAILED + req.error = str(exc) + self._db.save_export_request(req) + self._audit.log_export_failed(user_id, req.request_id, str(exc)) + raise ExportError(f"Export failed for user {user_id}: {exc}") from exc + + return req + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _build_package( + self, req: ExportRequest, + ) -> Tuple[bytes, Dict[str, Any]]: + """Build the ZIP bytes and return (zip_bytes, manifest_dict).""" + user_id = req.user_id + buf = io.BytesIO() + file_hashes: Dict[str, str] = {} + record_counts: Dict[str, int] = {} + + tables_included = [] + + with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: + + def _add(arc_path: str, data: bytes) -> None: + zf.writestr(arc_path, data) + file_hashes[arc_path] = _sha256_bytes(data) + + base = f"export_{req.request_id}" + + # ---- data/profile.json ------------------------------------ + profile = self._db.get_profile(user_id) + if profile: + profile_data = profile.to_dict() + _add(f"{base}/data/profile.json", _to_json_bytes(profile_data)) + record_counts["profile"] = 1 + tables_included.append("profile") + else: + _add(f"{base}/data/profile.json", _to_json_bytes([])) + record_counts["profile"] = 0 + + # ---- data/payments.json ----------------------------------- + payments = self._db.get_payments_for_user(user_id) + payments_data = [p.to_dict() for p in payments] + _add(f"{base}/data/payments.json", _to_json_bytes(payments_data)) + record_counts["payments"] = len(payments_data) + if payments_data: + tables_included.append("payments") + + # ---- data/activities.json (capped) ------------------------ + activities = self._db.get_activities_for_user(user_id) + activities_data = [a.to_dict() for a in activities[-_MAX_ACTIVITY_ROWS:]] + _add(f"{base}/data/activities.json", _to_json_bytes(activities_data)) + record_counts["activities"] = len(activities_data) + if activities_data: + tables_included.append("activities") + + # ---- data/credentials.json (secrets redacted) ------------- + cred = self._db.get_credential(user_id) + if cred: + cred_data = cred.to_dict() # secrets already masked in model + _add(f"{base}/data/credentials.json", _to_json_bytes(cred_data)) + record_counts["credentials"] = 1 + tables_included.append("credentials") + else: + _add(f"{base}/data/credentials.json", _to_json_bytes({})) + record_counts["credentials"] = 0 + + # ---- audit/audit_trail.json -------------------------------- + user_audit = self._audit.get_user_trail(user_id) + _add(f"{base}/audit/audit_trail.json", _to_json_bytes(user_audit)) + record_counts["audit_events"] = len(user_audit) + + # ---- pii_report/ ------------------------------------------- + pii_reports = self._build_pii_reports( + user_id, profile, payments, activities, cred + ) + _add( + f"{base}/pii_report/pii_classification.json", + _to_json_bytes(pii_reports), + ) + + # ---- README.txt ------------------------------------------- + readme = _README_TEMPLATE.format( + request_id=req.request_id, + user_id=user_id, + requested_at=req.requested_at, + generated_at=_now_iso(), + max_activities=_MAX_ACTIVITY_ROWS, + ) + _add(f"{base}/README.txt", readme.encode("utf-8")) + + # ---- MANIFEST.json ---------------------------------------- + manifest: Dict[str, Any] = { + "schema_version": "1.0", + "request_id": req.request_id, + "user_id": user_id, + "generated_at": _now_iso(), + "record_counts": record_counts, + "tables_included": tables_included, + "files": file_hashes, + } + _add(f"{base}/MANIFEST.json", _to_json_bytes(manifest)) + + self._audit.log_export_started(user_id, req.request_id, tables_included) + + return buf.getvalue(), manifest + + def _build_pii_reports( + self, + user_id: str, + profile: Any, + payments: List[Any], + activities: List[Any], + cred: Any, + ) -> Dict[str, Any]: + """Run PII detection across all data tables and return aggregated report.""" + reports: Dict[str, Any] = {} + + if profile: + r = self._detector.scan(profile.to_dict(), "UserProfile") + reports["profile"] = r.to_dict() + + if payments: + payment_reports = self._detector.scan_multiple( + [p.to_dict() for p in payments], "PaymentRecord" + ) + reports["payments"] = [r.to_dict() for r in payment_reports] + + if activities: + activity_reports = self._detector.scan_multiple( + [a.to_dict() for a in activities[:10]], "ActivityLog" # sample + ) + reports["activities_sample"] = [r.to_dict() for r in activity_reports] + + if cred: + r = self._detector.scan(cred.to_dict(), "UserCredential") + reports["credentials"] = r.to_dict() + + return reports + + def _persist_package(self, request_id: str, package_bytes: bytes) -> str: + """ + 'Persist' the package. + + In production: upload to encrypted S3 bucket / Azure Blob with a + pre-signed URL valid for 24 h. Here we return a logical path string. + """ + if self._output_dir: + import os + os.makedirs(self._output_dir, exist_ok=True) + path = os.path.join(self._output_dir, f"export_{request_id}.zip") + with open(path, "wb") as fh: + fh.write(package_bytes) + return path + # In-memory mode — return a virtual path + return f"memory://exports/export_{request_id}.zip" + + def get_package_bytes(self, request_id: str) -> Optional[bytes]: + """ + Retrieve the raw ZIP bytes for a completed export (in-memory mode). + In production, this would generate a signed download URL. + """ + req = self._db.export_requests.get(request_id) + if not req or req.status != ExportStatus.COMPLETED: + return None + # Re-build (idempotent) for in-memory mode + try: + package_bytes, _ = self._build_package(req) + return package_bytes + except Exception: + return None + + +class ExportError(Exception): + """Raised when export package generation fails.""" + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() diff --git a/pii_export/main.py b/pii_export/main.py new file mode 100644 index 00000000..1bc84243 --- /dev/null +++ b/pii_export/main.py @@ -0,0 +1,337 @@ +""" +main.py — Interactive CLI demonstration of the PII Export & Delete Workflow. + +Run: + python main.py + +Menu options: + 1 Export user data + 2 Request deletion + 3 Confirm deletion + 4 Execute deletion + 5 Cancel deletion + 6 Verify audit chain + 7 View audit trail (user) + 8 View full audit summary + 9 Check data presence + 0 Quit +""" + +from __future__ import annotations + +import json +import os +import sys +import textwrap +from typing import Optional + +from models import InMemoryDatabase +from seed_data import seed_user, seed_multiple_users +from workflow import PIIWorkflow + + +# ── Colour helpers (graceful degradation on Windows/non-TTY) ─────────────── + +def _c(code: str, text: str) -> str: + if sys.stdout.isatty() and os.name != "nt": + return f"\033[{code}m{text}\033[0m" + return text + +GREEN = lambda t: _c("32", t) +YELLOW = lambda t: _c("33", t) +RED = lambda t: _c("31", t) +CYAN = lambda t: _c("36", t) +BOLD = lambda t: _c("1", t) + + +# ── Pretty-print helpers ──────────────────────────────────────────────────── + +def _pp(obj: object) -> None: + print(json.dumps(obj, indent=2, default=str)) + + +def _banner() -> None: + print(CYAN(""" +╔══════════════════════════════════════════════════════╗ +║ GDPR PII Export & Delete Workflow (demo) ║ +╚══════════════════════════════════════════════════════╝ +""")) + + +def _menu() -> None: + print(BOLD("\n── Menu ─────────────────────────────────────────────")) + options = [ + ("1", "Export user data"), + ("2", "Request deletion"), + ("3", "Confirm deletion (enter token)"), + ("4", "Execute deletion"), + ("5", "Cancel deletion"), + ("6", "Verify audit chain integrity"), + ("7", "View audit trail for a user"), + ("8", "View audit summary (all users)"), + ("9", "Check data presence for a user"), + ("0", "Quit"), + ] + for key, label in options: + print(f" {YELLOW(key)} {label}") + print() + + +# ── CLI handler ───────────────────────────────────────────────────────────── + +class CLI: + def __init__(self) -> None: + self.db = InMemoryDatabase() + self.wf = PIIWorkflow(self.db, output_dir=None) # in-memory mode + # Track most-recent deletion request per user for convenience + self._last_del_req: dict = {} + + # Pre-seed three users + user_ids = seed_multiple_users(self.db) + print(GREEN(f"\n✓ Seeded {len(user_ids)} test users: {', '.join(user_ids)}")) + + # ---------------------------------------------------------------- + def _prompt_user_id(self) -> str: + uid = input(" Enter user_id (e.g. usr_001): ").strip() + return uid or "usr_001" + + # ---------------------------------------------------------------- + def run(self) -> None: + _banner() + while True: + _menu() + choice = input(BOLD("Select option: ")).strip() + + if choice == "1": + self._export() + elif choice == "2": + self._request_delete() + elif choice == "3": + self._confirm_delete() + elif choice == "4": + self._execute_delete() + elif choice == "5": + self._cancel_delete() + elif choice == "6": + self._verify_chain() + elif choice == "7": + self._user_trail() + elif choice == "8": + self._audit_summary() + elif choice == "9": + self._data_presence() + elif choice == "0": + print(GREEN("\nGoodbye!\n")) + break + else: + print(RED("Unknown option — try again.")) + + # ── Option handlers ────────────────────────────────────────────── + + def _export(self) -> None: + uid = self._prompt_user_id() + try: + req = self.wf.export_user_data(uid, requested_by="user", + ip_address="127.0.0.1") + print(GREEN(f"\n✓ Export complete!")) + print(f" Request ID : {req.request_id}") + print(f" Status : {req.status.value}") + print(f" Package path : {req.package_path}") + print(f" SHA-256 : {req.package_hash}") + print(f" Completed at : {req.completed_at}") + except Exception as e: + print(RED(f"\n✗ Export failed: {e}")) + + def _request_delete(self) -> None: + uid = self._prompt_user_id() + reason = input(" Reason [user_request]: ").strip() or "user_request" + grace = input(" Grace period days [0 for immediate]: ").strip() + grace = int(grace) if grace.isdigit() else 0 + try: + req = self.wf.request_deletion(uid, reason=reason, + grace_period_days=grace, + ip_address="127.0.0.1") + self._last_del_req[uid] = req + print(GREEN(f"\n✓ Deletion request created!")) + print(f" Request ID : {req.request_id}") + print(f" Status : {req.status.value}") + print(f" Expires at : {req.token_expires_at}") + print(YELLOW(f"\n ⚠ Confirmation token (in prod, sent via email):")) + print(f" {req.confirmation_token}") + except Exception as e: + print(RED(f"\n✗ Failed: {e}")) + + def _confirm_delete(self) -> None: + uid = self._prompt_user_id() + saved = self._last_del_req.get(uid) + if saved: + req_id = saved.request_id + default_token = saved.confirmation_token + print(f" (auto-filled request_id: {req_id})") + else: + req_id = input(" Request ID : ").strip() + default_token = "" + + token = input( + f" Token [{default_token[:20]}…]: " + ).strip() or default_token + + try: + req = self.wf.confirm_deletion(req_id, token, actor=f"user:{uid}", + ip_address="127.0.0.1") + print(GREEN(f"\n✓ Deletion confirmed!")) + print(f" Status : {req.status.value}") + print(f" Confirmed at : {req.confirmed_at}") + except Exception as e: + print(RED(f"\n✗ Confirmation failed: {e}")) + + def _execute_delete(self) -> None: + uid = self._prompt_user_id() + saved = self._last_del_req.get(uid) + if saved: + req_id = saved.request_id + print(f" (auto-filled request_id: {req_id})") + else: + req_id = input(" Request ID: ").strip() + + confirm = input( + RED(" This is IRREVERSIBLE. Type 'DELETE' to proceed: ") + ).strip() + if confirm != "DELETE": + print(YELLOW(" Aborted.")) + return + + try: + req = self.wf.execute_deletion(req_id) + print(GREEN(f"\n✓ Deletion executed!")) + print(f" Status : {req.status.value}") + print(f" Completed at : {req.completed_at}") + print(f" Records deleted : {req.deleted_record_count}") + print(f" Records anonymised : {req.anonymised_record_count}") + except Exception as e: + print(RED(f"\n✗ Deletion failed: {e}")) + + def _cancel_delete(self) -> None: + uid = self._prompt_user_id() + saved = self._last_del_req.get(uid) + if saved: + req_id = saved.request_id + else: + req_id = input(" Request ID: ").strip() + try: + req = self.wf.cancel_deletion(req_id, actor=f"user:{uid}", + reason="user_changed_mind") + print(GREEN(f"\n✓ Deletion request cancelled.")) + print(f" Status: {req.status.value}") + except Exception as e: + print(RED(f"\n✗ Cancel failed: {e}")) + + def _verify_chain(self) -> None: + result = self.wf.verify_audit_chain() + icon = GREEN("✓") if result["valid"] else RED("✗") + print(f"\n Audit chain: {icon} {result['details']}") + print(f" Total entries: {result['total_entries']}") + + def _user_trail(self) -> None: + uid = self._prompt_user_id() + trail = self.wf.get_user_audit_trail(uid) + if not trail: + print(YELLOW(f"\n No audit entries found for '{uid}'.")) + return + print(f"\n {len(trail)} entries for {uid}:\n") + for e in trail: + print(f" [{e['timestamp']}] {BOLD(e['action'])}") + print(f" actor={e['actor']} req={e['request_id'] or '-'}") + if e.get("details"): + detail_str = json.dumps(e["details"], separators=(",", ":")) + print(f" {textwrap.shorten(detail_str, 80)}") + + def _audit_summary(self) -> None: + summary = self.wf.get_audit_summary() + print(f"\n Total log entries : {summary['total_entries']}") + print(f" Unique users : {summary['unique_users']}") + print("\n Action breakdown:") + for action, count in sorted(summary["action_counts"].items()): + print(f" {action:<35} {count}") + + def _data_presence(self) -> None: + uid = self._prompt_user_id() + exists = self.wf.user_data_exists(uid) + print(f"\n Data presence for '{uid}':") + for table, present in exists.items(): + icon = GREEN("✓") if present else RED("✗") + print(f" {icon} {table}") + + +# ── Entry point ───────────────────────────────────────────────────────────── + +def run_automated_demo(silent: bool = False) -> None: + """ + Run a non-interactive demo that exercises the full workflow end-to-end. + Used in tests and as a smoke-test entry point. + """ + + def log(msg: str) -> None: + if not silent: + print(msg) + + log("\n" + "="*60) + log(" AUTOMATED DEMO — Full PII Export & Delete Workflow") + log("="*60) + + db = InMemoryDatabase() + wf = PIIWorkflow(db, output_dir=None) + uid = "usr_demo" + seed_user(db, uid) + + # ── 1. Export ──────────────────────────────────────────────────── + log("\n[1] Exporting user data …") + export_req = wf.export_user_data(uid, requested_by="user", ip_address="10.0.0.1") + log(f" ✓ Export complete | hash={export_req.package_hash[:16]}…") + + # ── 2. Request deletion ────────────────────────────────────────── + log("\n[2] Requesting deletion …") + del_req = wf.request_deletion(uid, reason="user_request", grace_period_days=0) + log(f" ✓ Request created | id={del_req.request_id[:8]}…") + token = del_req.confirmation_token + + # ── 3. Confirm deletion ────────────────────────────────────────── + log("\n[3] Confirming deletion …") + del_req = wf.confirm_deletion(del_req.request_id, token, actor=f"user:{uid}") + log(f" ✓ Confirmed | status={del_req.status.value}") + + # ── 4. Execute deletion ────────────────────────────────────────── + log("\n[4] Executing deletion …") + del_req = wf.execute_deletion(del_req.request_id) + log(f" ✓ Deletion done | deleted={del_req.deleted_record_count}" + f" anonymised={del_req.anonymised_record_count}") + + # ── 5. Verify data is gone ─────────────────────────────────────── + log("\n[5] Verifying data was purged …") + presence = wf.user_data_exists(uid) + all_gone = not any(presence.values()) + log(f" ✓ All PII removed | presence={presence}") + assert all_gone, f"Expected all PII gone, got: {presence}" + + # ── 6. Verify audit chain ──────────────────────────────────────── + log("\n[6] Verifying audit chain integrity …") + chain = wf.verify_audit_chain() + log(f" ✓ Chain valid | entries={chain['total_entries']}") + assert chain["valid"], f"Audit chain broken: {chain['details']}" + + # ── 7. Audit summary ───────────────────────────────────────────── + log("\n[7] Audit summary:") + summary = wf.get_audit_summary() + for action, count in sorted(summary["action_counts"].items()): + log(f" {action:<35} {count}") + + log("\n" + "="*60) + log(" DEMO COMPLETE — all assertions passed ✓") + log("="*60 + "\n") + + +if __name__ == "__main__": + if "--demo" in sys.argv: + run_automated_demo() + else: + CLI().run() diff --git a/pii_export/models.py b/pii_export/models.py new file mode 100644 index 00000000..4ddca8b5 --- /dev/null +++ b/pii_export/models.py @@ -0,0 +1,305 @@ +""" +models.py — Core data models for the PII Export & Delete Workflow. + +Defines User, associated PII data tables, and AuditLogEntry using +pure Python dataclasses (no external ORM dependency required). +""" + +from __future__ import annotations + +import hashlib +import json +import uuid +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Dict, List, Optional + + +# --------------------------------------------------------------------------- +# Enumerations +# --------------------------------------------------------------------------- + +class DataCategory(str, Enum): + """GDPR Article 4 — categories of personal data.""" + IDENTITY = "identity" # name, DOB, national ID + CONTACT = "contact" # email, phone, address + FINANCIAL = "financial" # card numbers, IBAN, transactions + BEHAVIOURAL = "behavioural" # activity logs, preferences + SENSITIVE = "sensitive" # health, biometric, political views + CREDENTIALS = "credentials" # hashed passwords, tokens + TECHNICAL = "technical" # IP addresses, device IDs, cookies + + +class DeletionStatus(str, Enum): + PENDING = "pending" + CONFIRMED = "confirmed" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + + +class ExportStatus(str, Enum): + REQUESTED = "requested" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + + +class AuditAction(str, Enum): + EXPORT_REQUESTED = "EXPORT_REQUESTED" + EXPORT_STARTED = "EXPORT_STARTED" + EXPORT_COMPLETED = "EXPORT_COMPLETED" + EXPORT_FAILED = "EXPORT_FAILED" + DELETE_REQUESTED = "DELETE_REQUESTED" + DELETE_TOKEN_ISSUED = "DELETE_TOKEN_ISSUED" + DELETE_CONFIRMED = "DELETE_CONFIRMED" + DELETE_STARTED = "DELETE_STARTED" + DELETE_COMPLETED = "DELETE_COMPLETED" + DELETE_FAILED = "DELETE_FAILED" + DELETE_CANCELLED = "DELETE_CANCELLED" + PII_FIELD_REDACTED = "PII_FIELD_REDACTED" + PII_FIELD_DELETED = "PII_FIELD_DELETED" + RECORD_ANONYMISED = "RECORD_ANONYMISED" + + +# --------------------------------------------------------------------------- +# PII Data Records (simulate various data stores) +# --------------------------------------------------------------------------- + +@dataclass +class UserProfile: + """Core identity & contact data.""" + user_id: str + full_name: str + email: str + phone: Optional[str] = None + date_of_birth: Optional[str] = None # ISO-8601 + national_id: Optional[str] = None + address: Optional[Dict[str, str]] = None # street/city/postcode/country + created_at: str = field(default_factory=lambda: _now_iso()) + updated_at: str = field(default_factory=lambda: _now_iso()) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class PaymentRecord: + """Financial / payment data.""" + record_id: str = field(default_factory=lambda: str(uuid.uuid4())) + user_id: str = "" + card_last4: str = "" + card_brand: str = "" + billing_name: str = "" + billing_address: Optional[Dict[str, str]] = None + transaction_ids: List[str] = field(default_factory=list) + created_at: str = field(default_factory=lambda: _now_iso()) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class ActivityLog: + """Behavioural / usage data.""" + log_id: str = field(default_factory=lambda: str(uuid.uuid4())) + user_id: str = "" + ip_address: str = "" + user_agent: str = "" + action: str = "" + resource: str = "" + timestamp: str = field(default_factory=lambda: _now_iso()) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class UserCredential: + """Credential / authentication data.""" + user_id: str + password_hash: str + salt: str + mfa_secret: Optional[str] = None + recovery_codes: List[str] = field(default_factory=list) + last_login: Optional[str] = None + created_at: str = field(default_factory=lambda: _now_iso()) + + def to_dict(self) -> Dict[str, Any]: + d = asdict(self) + # Never expose raw secrets in exports — mask them + d["password_hash"] = "[REDACTED]" + d["salt"] = "[REDACTED]" + d["mfa_secret"] = "[REDACTED]" if d["mfa_secret"] else None + d["recovery_codes"] = ["[REDACTED]"] * len(d["recovery_codes"]) + return d + + +# --------------------------------------------------------------------------- +# Workflow state models +# --------------------------------------------------------------------------- + +@dataclass +class ExportRequest: + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + user_id: str = "" + requested_at: str = field(default_factory=lambda: _now_iso()) + status: ExportStatus = ExportStatus.REQUESTED + package_path: Optional[str] = None + package_hash: Optional[str] = None # SHA-256 of the ZIP + completed_at: Optional[str] = None + error: Optional[str] = None + requested_by: str = "" # "user" | "admin" | "legal" + + def to_dict(self) -> Dict[str, Any]: + return {k: (v.value if isinstance(v, Enum) else v) + for k, v in asdict(self).items()} + + +@dataclass +class DeletionRequest: + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + user_id: str = "" + requested_at: str = field(default_factory=lambda: _now_iso()) + status: DeletionStatus = DeletionStatus.PENDING + confirmation_token: Optional[str] = None + token_expires_at: Optional[str] = None + confirmed_at: Optional[str] = None + completed_at: Optional[str] = None + deleted_record_count: int = 0 + anonymised_record_count: int = 0 + error: Optional[str] = None + requested_by: str = "" + reason: str = "" # "user_request" | "legal_hold" | "admin" + grace_period_days: int = 30 + + def to_dict(self) -> Dict[str, Any]: + return {k: (v.value if isinstance(v, Enum) else v) + for k, v in asdict(self).items()} + + +# --------------------------------------------------------------------------- +# Audit log entry (append-only, chained) +# --------------------------------------------------------------------------- + +@dataclass +class AuditLogEntry: + """ + Immutable audit log entry with SHA-256 chaining for tamper evidence. + + Each entry hashes its own content + the previous entry's hash to form + a verifiable chain (similar to a blockchain / CT log). + """ + entry_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: str = field(default_factory=lambda: _now_iso()) + action: AuditAction = AuditAction.EXPORT_REQUESTED + actor: str = "" # who triggered the action + user_id: str = "" # subject whose data is affected + request_id: str = "" # export or deletion request ID + details: Dict[str, Any] = field(default_factory=dict) + ip_address: Optional[str] = None + prev_hash: str = "" # hash of previous entry in chain + entry_hash: str = "" # computed hash of *this* entry + + # ------------------------------------------------------------------ + def compute_hash(self) -> str: + """ + Compute the SHA-256 hash of this entry's canonical payload. + Called after all fields are set (including prev_hash). + """ + payload = { + "entry_id": self.entry_id, + "timestamp": self.timestamp, + "action": self.action.value, + "actor": self.actor, + "user_id": self.user_id, + "request_id": self.request_id, + "details": self.details, + "ip_address": self.ip_address, + "prev_hash": self.prev_hash, + } + canonical = json.dumps(payload, sort_keys=True, separators=(",", ":")) + return hashlib.sha256(canonical.encode()).hexdigest() + + def seal(self) -> "AuditLogEntry": + """Finalise the entry by computing and storing its own hash.""" + self.entry_hash = self.compute_hash() + return self + + def to_dict(self) -> Dict[str, Any]: + return {k: (v.value if isinstance(v, Enum) else v) + for k, v in asdict(self).items()} + + +# --------------------------------------------------------------------------- +# In-memory "database" (acts as a thin repository layer) +# --------------------------------------------------------------------------- + +class InMemoryDatabase: + """ + Simulated multi-table database. + In production, replace each dict with the corresponding ORM/SQL table. + """ + + def __init__(self) -> None: + self.profiles: Dict[str, UserProfile] = {} + self.payments: Dict[str, PaymentRecord] = {} # key = record_id + self.activities: Dict[str, ActivityLog] = {} # key = log_id + self.credentials: Dict[str, UserCredential] = {} # key = user_id + self.export_requests: Dict[str, ExportRequest] = {} + self.deletion_requests: Dict[str, DeletionRequest] = {} + self.audit_log: List[AuditLogEntry] = [] + + # -- profiles ---------------------------------------------------------- + def add_profile(self, profile: UserProfile) -> None: + self.profiles[profile.user_id] = profile + + def get_profile(self, user_id: str) -> Optional[UserProfile]: + return self.profiles.get(user_id) + + # -- payments ---------------------------------------------------------- + def add_payment(self, record: PaymentRecord) -> None: + self.payments[record.record_id] = record + + def get_payments_for_user(self, user_id: str) -> List[PaymentRecord]: + return [r for r in self.payments.values() if r.user_id == user_id] + + # -- activities -------------------------------------------------------- + def add_activity(self, log: ActivityLog) -> None: + self.activities[log.log_id] = log + + def get_activities_for_user(self, user_id: str) -> List[ActivityLog]: + return [a for a in self.activities.values() if a.user_id == user_id] + + # -- credentials ------------------------------------------------------- + def add_credential(self, cred: UserCredential) -> None: + self.credentials[cred.user_id] = cred + + def get_credential(self, user_id: str) -> Optional[UserCredential]: + return self.credentials.get(user_id) + + # -- requests ---------------------------------------------------------- + def save_export_request(self, req: ExportRequest) -> None: + self.export_requests[req.request_id] = req + + def save_deletion_request(self, req: DeletionRequest) -> None: + self.deletion_requests[req.request_id] = req + + def get_deletion_request(self, request_id: str) -> Optional[DeletionRequest]: + return self.deletion_requests.get(request_id) + + # -- audit ------------------------------------------------------------- + def append_audit(self, entry: AuditLogEntry) -> None: + self.audit_log.append(entry) + + def get_audit_for_user(self, user_id: str) -> List[AuditLogEntry]: + return [e for e in self.audit_log if e.user_id == user_id] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() diff --git a/pii_export/pii_detector.py b/pii_export/pii_detector.py new file mode 100644 index 00000000..a2d136d1 --- /dev/null +++ b/pii_export/pii_detector.py @@ -0,0 +1,293 @@ +""" +pii_detector.py — PII field classification engine. + +Identifies and classifies PII fields within arbitrary dicts using: + 1. Known field-name patterns (fast lookup). + 2. Regex-based value scanning for common PII formats. + 3. Confidence scoring so callers can apply risk-appropriate handling. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple + +from models import DataCategory + + +# --------------------------------------------------------------------------- +# Risk levels +# --------------------------------------------------------------------------- + +class RiskLevel(str, Enum): + LOW = "low" # pseudonymised / indirectly identifying + MEDIUM = "medium" # directly identifying but not sensitive + HIGH = "high" # sensitive / financial / credentials + CRITICAL = "critical" # e.g. SSN, full card PAN, biometric + + +# --------------------------------------------------------------------------- +# Detection result +# --------------------------------------------------------------------------- + +@dataclass +class PIIField: + """Describes a single detected PII field.""" + field_path: str # dot-notation path, e.g. "address.postcode" + category: DataCategory + risk_level: RiskLevel + confidence: float # 0.0 – 1.0 + match_reason: str # human-readable explanation + sample_value: Optional[str] = None # redacted preview (first 4 chars + ***) + + def to_dict(self) -> Dict[str, Any]: + return { + "field_path": self.field_path, + "category": self.category.value, + "risk_level": self.risk_level.value, + "confidence": self.confidence, + "match_reason": self.match_reason, + "sample_value": self.sample_value, + } + + +@dataclass +class PIIReport: + """Aggregated report for a complete record.""" + record_type: str + total_fields: int + pii_fields: List[PIIField] + overall_risk: RiskLevel + + @property + def pii_field_count(self) -> int: + return len(self.pii_fields) + + @property + def has_critical(self) -> bool: + return any(f.risk_level == RiskLevel.CRITICAL for f in self.pii_fields) + + def to_dict(self) -> Dict[str, Any]: + return { + "record_type": self.record_type, + "total_fields": self.total_fields, + "pii_field_count": self.pii_field_count, + "overall_risk": self.overall_risk.value, + "pii_fields": [f.to_dict() for f in self.pii_fields], + } + + +# --------------------------------------------------------------------------- +# Known field-name → (category, risk) mapping +# --------------------------------------------------------------------------- + +_FIELD_NAME_MAP: List[Tuple[re.Pattern, DataCategory, RiskLevel, str]] = [ + # Identity + (re.compile(r"\b(full_?name|first_?name|last_?name|surname|given_?name|display_?name)\b", re.I), + DataCategory.IDENTITY, RiskLevel.MEDIUM, "name field"), + (re.compile(r"\b(dob|date_of_birth|birth_?date|birthday)\b", re.I), + DataCategory.IDENTITY, RiskLevel.HIGH, "date of birth field"), + (re.compile(r"\b(national_?id|ssn|social_?security|passport|tax_?id|nin|nino)\b", re.I), + DataCategory.IDENTITY, RiskLevel.CRITICAL, "government ID field"), + (re.compile(r"\b(gender|sex|ethnicity|race|religion|political_?view|sexual_?orientation)\b", re.I), + DataCategory.SENSITIVE, RiskLevel.CRITICAL, "sensitive attribute field"), + + # Contact + (re.compile(r"\b(email|e_?mail|email_?address)\b", re.I), + DataCategory.CONTACT, RiskLevel.MEDIUM, "email field"), + (re.compile(r"\b(phone|telephone|mobile|cell|fax|tel)\b", re.I), + DataCategory.CONTACT, RiskLevel.MEDIUM, "phone field"), + (re.compile(r"\b(address|street|city|postcode|zip_?code|postal_?code|country)\b", re.I), + DataCategory.CONTACT, RiskLevel.MEDIUM, "address field"), + + # Financial + (re.compile(r"\b(card_?number|pan|credit_?card|debit_?card|card_?no)\b", re.I), + DataCategory.FINANCIAL, RiskLevel.CRITICAL, "payment card field"), + (re.compile(r"\b(card_?last4|last_?4|last_?four)\b", re.I), + DataCategory.FINANCIAL, RiskLevel.MEDIUM, "partial card field"), + (re.compile(r"\b(iban|bic|sort_?code|account_?number|routing_?number|bank_?account)\b", re.I), + DataCategory.FINANCIAL, RiskLevel.HIGH, "bank account field"), + (re.compile(r"\b(billing_?name|billing_?address)\b", re.I), + DataCategory.FINANCIAL, RiskLevel.MEDIUM, "billing detail field"), + + # Credentials + (re.compile(r"\b(password|passwd|pwd|secret|api_?key|token|auth_?token|refresh_?token|mfa_?secret|recovery_?code)\b", re.I), + DataCategory.CREDENTIALS, RiskLevel.CRITICAL, "credential field"), + (re.compile(r"\b(password_?hash|hashed_?pw|salt)\b", re.I), + DataCategory.CREDENTIALS, RiskLevel.HIGH, "hashed credential field"), + + # Technical + (re.compile(r"\b(ip_?address|ipv4|ipv6|ip)\b", re.I), + DataCategory.TECHNICAL, RiskLevel.LOW, "IP address field"), + (re.compile(r"\b(user_?agent|device_?id|fingerprint|cookie|session_?id)\b", re.I), + DataCategory.TECHNICAL, RiskLevel.LOW, "device/session field"), + + # Behavioural + (re.compile(r"\b(preference|setting|behaviour|activity|event|click|view)\b", re.I), + DataCategory.BEHAVIOURAL, RiskLevel.LOW, "behavioural field"), +] + + +# --------------------------------------------------------------------------- +# Regex value scanners +# --------------------------------------------------------------------------- + +_VALUE_PATTERNS: List[Tuple[re.Pattern, DataCategory, RiskLevel, str]] = [ + (re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z]{2,}$"), + DataCategory.CONTACT, RiskLevel.MEDIUM, "email value pattern"), + (re.compile(r"^\+?[\d\s\-().]{7,20}$"), + DataCategory.CONTACT, RiskLevel.MEDIUM, "phone number value pattern"), + (re.compile(r"^\d{3}-\d{2}-\d{4}$"), + DataCategory.IDENTITY, RiskLevel.CRITICAL, "SSN value pattern"), + (re.compile(r"\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13}|6(?:011|5[0-9]{2})[0-9]{12})\b"), + DataCategory.FINANCIAL, RiskLevel.CRITICAL, "payment card PAN value pattern"), + (re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}(?:[A-Z0-9]?){0,16}\b"), + DataCategory.FINANCIAL, RiskLevel.HIGH, "IBAN value pattern"), + (re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"), + DataCategory.TECHNICAL, RiskLevel.LOW, "IPv4 value pattern"), + (re.compile(r"^\d{4}-\d{2}-\d{2}$"), + DataCategory.IDENTITY, RiskLevel.MEDIUM, "date value pattern"), +] + +# Risk ordering for aggregation +_RISK_ORDER = { + RiskLevel.LOW: 0, + RiskLevel.MEDIUM: 1, + RiskLevel.HIGH: 2, + RiskLevel.CRITICAL: 3, +} + + +def _redact_sample(value: Any) -> Optional[str]: + """Return a safely redacted preview: first 2 chars + ***.""" + if value is None: + return None + s = str(value) + if len(s) <= 2: + return "***" + return s[:2] + "***" + + +def _detect_field( + field_path: str, + value: Any, +) -> Optional[PIIField]: + """ + Detect whether a single (path, value) pair contains PII. + Returns the highest-confidence PIIField, or None. + """ + leaf = field_path.split(".")[-1] + best: Optional[PIIField] = None + + # 1. Field-name matching + for pattern, category, risk, reason in _FIELD_NAME_MAP: + if pattern.search(leaf): + candidate = PIIField( + field_path=field_path, + category=category, + risk_level=risk, + confidence=0.9, + match_reason=f"field name matched: {reason}", + sample_value=_redact_sample(value), + ) + if best is None or _RISK_ORDER[risk] > _RISK_ORDER[best.risk_level]: + best = candidate + + # 2. Value-pattern matching (only on string-like scalars) + if isinstance(value, (str, int, float)) and value not in (None, "", 0): + str_value = str(value) + for pattern, category, risk, reason in _VALUE_PATTERNS: + if pattern.search(str_value): + candidate = PIIField( + field_path=field_path, + category=category, + risk_level=risk, + confidence=0.75, + match_reason=f"value matched: {reason}", + sample_value=_redact_sample(value), + ) + if best is None or _RISK_ORDER[risk] > _RISK_ORDER[best.risk_level]: + best = candidate + + return best + + +def _flatten_dict( + obj: Any, + prefix: str = "", + sep: str = ".", +) -> List[Tuple[str, Any]]: + """Recursively flatten a nested dict/list into (dot-path, value) pairs.""" + items: List[Tuple[str, Any]] = [] + if isinstance(obj, dict): + for k, v in obj.items(): + new_key = f"{prefix}{sep}{k}" if prefix else k + items.extend(_flatten_dict(v, new_key, sep)) + elif isinstance(obj, list): + for i, v in enumerate(obj): + new_key = f"{prefix}[{i}]" + items.extend(_flatten_dict(v, new_key, sep)) + else: + items.append((prefix, obj)) + return items + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +class PIIDetector: + """ + Scans arbitrary record dicts and returns a structured PIIReport. + + Usage:: + + detector = PIIDetector() + report = detector.scan(record_dict, record_type="UserProfile") + """ + + def scan(self, record: Dict[str, Any], record_type: str = "unknown") -> PIIReport: + """Scan a single record dict and return a full PIIReport.""" + flat_fields = _flatten_dict(record) + total = len(flat_fields) + detected: List[PIIField] = [] + seen_paths: set = set() + + for path, value in flat_fields: + if path in seen_paths: + continue + result = _detect_field(path, value) + if result: + detected.append(result) + seen_paths.add(path) + + # Compute overall risk as the max risk across all detected fields + if not detected: + overall_risk = RiskLevel.LOW + else: + overall_risk = max(detected, key=lambda f: _RISK_ORDER[f.risk_level]).risk_level + + return PIIReport( + record_type=record_type, + total_fields=total, + pii_fields=detected, + overall_risk=overall_risk, + ) + + def scan_multiple( + self, + records: List[Dict[str, Any]], + record_type: str = "unknown", + ) -> List[PIIReport]: + """Scan a list of records and return a report per record.""" + return [self.scan(r, record_type) for r in records] + + def classify_fields( + self, + record: Dict[str, Any], + ) -> Dict[str, PIIField]: + """Return a flat mapping of {field_path: PIIField} for PII fields.""" + report = self.scan(record) + return {f.field_path: f for f in report.pii_fields} diff --git a/pii_export/seed_data.py b/pii_export/seed_data.py new file mode 100644 index 00000000..312fbd68 --- /dev/null +++ b/pii_export/seed_data.py @@ -0,0 +1,89 @@ +""" +seed_data.py — Helper to populate an InMemoryDatabase with realistic test data. + +Used by main.py and tests.py. +""" + +from __future__ import annotations + +import uuid +from models import ( + ActivityLog, + InMemoryDatabase, + PaymentRecord, + UserCredential, + UserProfile, +) + + +def seed_user(db: InMemoryDatabase, user_id: str = "usr_001") -> None: + """Seed a complete user profile + related PII data for demonstration.""" + + # -- Profile ---------------------------------------------------------- + db.add_profile(UserProfile( + user_id=user_id, + full_name="Jane Doe", + email="jane.doe@example.com", + phone="+44 7700 900123", + date_of_birth="1990-03-15", + national_id="AB123456C", + address={ + "street": "42 Acacia Avenue", + "city": "London", + "postcode": "SW1A 1AA", + "country": "GB", + }, + )) + + # -- Credentials ------------------------------------------------------ + db.add_credential(UserCredential( + user_id=user_id, + password_hash="$argon2id$v=19$m=65536,t=3,p=4$...", + salt="random-salt-hex-here", + mfa_secret="JBSWY3DPEHPK3PXP", # TOTP secret + recovery_codes=["abc1-def2", "ghi3-jkl4", "mno5-pqr6"], + last_login="2024-06-01T09:00:00+00:00", + )) + + # -- Payments --------------------------------------------------------- + for i, (brand, last4) in enumerate([("Visa", "4242"), ("Mastercard", "5555")], 1): + db.add_payment(PaymentRecord( + record_id=f"pay_{user_id}_{i:03d}", + user_id=user_id, + card_last4=last4, + card_brand=brand, + billing_name="Jane Doe", + billing_address={ + "street": "42 Acacia Avenue", + "city": "London", + "postcode": "SW1A 1AA", + "country": "GB", + }, + transaction_ids=[f"txn_{uuid.uuid4().hex[:8]}" for _ in range(3)], + )) + + # -- Activity logs ---------------------------------------------------- + actions = [ + ("login", "/auth/login"), + ("view_profile", "/account/profile"), + ("update_email", "/account/settings"), + ("download", "/account/export"), + ("logout", "/auth/logout"), + ] + for i, (action, resource) in enumerate(actions): + db.add_activity(ActivityLog( + log_id=f"log_{user_id}_{i:03d}", + user_id=user_id, + ip_address=f"192.168.1.{10 + i}", + user_agent="Mozilla/5.0 (compatible; demo)", + action=action, + resource=resource, + )) + + +def seed_multiple_users(db: InMemoryDatabase) -> list[str]: + """Seed three different users and return their IDs.""" + user_ids = ["usr_001", "usr_002", "usr_003"] + for uid in user_ids: + seed_user(db, uid) + return user_ids diff --git a/pii_export/tests.py b/pii_export/tests.py new file mode 100644 index 00000000..b2464cfa --- /dev/null +++ b/pii_export/tests.py @@ -0,0 +1,774 @@ +""" +tests.py — Comprehensive test suite for the PII Export & Delete Workflow. + +Test classes: + TestPIIDetector — field/value classification + TestAuditLogger — chain integrity, tamper detection, reporting + TestExportService — package generation, manifest, content + TestDeleteService — token lifecycle, confirmation, execution, errors + TestWorkflowIntegration — end-to-end happy path + edge cases + +Run with: + python -m pytest tests.py -v + python tests.py (unittest runner) +""" + +from __future__ import annotations + +import hashlib +import io +import json +import time +import unittest +import zipfile +from typing import Any, Dict +from unittest.mock import patch + +from audit_logger import AuditLogger, ChainIntegrityError +from delete_service import ( + DeleteService, + DeletionAlreadyProcessedError, + DeletionError, + InvalidTokenError, +) +from export_service import ExportError, ExportService +from models import ( + AuditAction, + DeletionStatus, + ExportStatus, + InMemoryDatabase, +) +from pii_detector import DataCategory, PIIDetector, RiskLevel +from seed_data import seed_user, seed_multiple_users +from workflow import PIIWorkflow + + +# ═══════════════════════════════════════════════════════════════════════════ +# Helpers +# ═══════════════════════════════════════════════════════════════════════════ + +def _make_wf(output_dir=None) -> tuple[InMemoryDatabase, PIIWorkflow]: + db = InMemoryDatabase() + return db, PIIWorkflow(db, output_dir=output_dir) + + +def _make_seeded_wf(user_id="usr_001") -> tuple[InMemoryDatabase, PIIWorkflow, str]: + db, wf = _make_wf() + seed_user(db, user_id) + return db, wf, user_id + + +# ═══════════════════════════════════════════════════════════════════════════ +# 1. PII Detector +# ═══════════════════════════════════════════════════════════════════════════ + +class TestPIIDetector(unittest.TestCase): + + def setUp(self): + self.detector = PIIDetector() + + # ── field-name detection ───────────────────────────────────────── + + def test_email_field_detected(self): + report = self.detector.scan({"email": "alice@example.com"}, "test") + self.assertEqual(report.pii_field_count, 1) + field = report.pii_fields[0] + self.assertEqual(field.category, DataCategory.CONTACT) + self.assertEqual(field.risk_level, RiskLevel.MEDIUM) + + def test_national_id_is_critical(self): + report = self.detector.scan({"national_id": "AB123456C"}, "test") + critical = [f for f in report.pii_fields if f.risk_level == RiskLevel.CRITICAL] + self.assertTrue(len(critical) >= 1) + + def test_password_hash_is_high(self): + report = self.detector.scan({"password_hash": "$argon2id$..."}, "test") + field = report.pii_fields[0] + self.assertIn(field.risk_level, (RiskLevel.HIGH, RiskLevel.CRITICAL)) + + def test_card_number_is_critical(self): + report = self.detector.scan({"card_number": "4111111111111111"}, "test") + field = report.pii_fields[0] + self.assertEqual(field.risk_level, RiskLevel.CRITICAL) + + def test_ip_address_is_low(self): + report = self.detector.scan({"ip_address": "192.168.1.1"}, "test") + field = report.pii_fields[0] + self.assertEqual(field.risk_level, RiskLevel.LOW) + + # ── value-pattern detection ────────────────────────────────────── + + def test_email_value_pattern(self): + report = self.detector.scan({"contact": "jane@test.org"}, "test") + # value pattern should catch even non-standard field names + self.assertTrue(any( + f.match_reason.startswith("value matched") for f in report.pii_fields + )) + + def test_ssn_value_pattern(self): + report = self.detector.scan({"identifier": "123-45-6789"}, "test") + ssn_fields = [f for f in report.pii_fields + if "SSN" in f.match_reason] + self.assertTrue(len(ssn_fields) >= 1) + + def test_credit_card_value_pattern(self): + # Luhn-valid Visa test number + report = self.detector.scan({"ref": "4111111111111111"}, "test") + card_fields = [f for f in report.pii_fields + if f.risk_level == RiskLevel.CRITICAL] + self.assertTrue(len(card_fields) >= 1) + + # ── nested / array structures ──────────────────────────────────── + + def test_nested_address(self): + record = {"address": {"street": "42 Main St", "postcode": "SW1A 1AA"}} + report = self.detector.scan(record, "test") + paths = {f.field_path for f in report.pii_fields} + # postcode is a contact field + self.assertTrue(any("postcode" in p for p in paths)) + + def test_list_of_records(self): + records = [ + {"email": "a@a.com", "name": "Alice"}, + {"email": "b@b.com", "name": "Bob"}, + ] + reports = self.detector.scan_multiple(records, "users") + self.assertEqual(len(reports), 2) + for r in reports: + self.assertGreater(r.pii_field_count, 0) + + # ── overall_risk ───────────────────────────────────────────────── + + def test_overall_risk_critical(self): + report = self.detector.scan({"national_id": "AB123456C"}, "test") + self.assertEqual(report.overall_risk, RiskLevel.CRITICAL) + + def test_overall_risk_low_for_no_pii(self): + report = self.detector.scan({"count": 42, "flag": True}, "test") + self.assertEqual(report.overall_risk, RiskLevel.LOW) + self.assertEqual(report.pii_field_count, 0) + + # ── sample value redaction ──────────────────────────────────────── + + def test_sample_value_is_redacted(self): + report = self.detector.scan({"email": "jane.doe@example.com"}, "test") + field = report.pii_fields[0] + self.assertIsNotNone(field.sample_value) + self.assertNotIn("jane", field.sample_value.lower()) + self.assertIn("***", field.sample_value) + + +# ═══════════════════════════════════════════════════════════════════════════ +# 2. Audit Logger +# ═══════════════════════════════════════════════════════════════════════════ + +class TestAuditLogger(unittest.TestCase): + + def setUp(self): + self.db = InMemoryDatabase() + self.logger = AuditLogger(self.db, "test-service") + + def test_first_entry_uses_genesis_hash(self): + entry = self.logger.log( + AuditAction.EXPORT_REQUESTED, "user:x", "user_x", "req_1" + ) + self.assertEqual(entry.prev_hash, AuditLogger.GENESIS_HASH) + + def test_chain_links_entries(self): + e1 = self.logger.log(AuditAction.EXPORT_REQUESTED, "user:x", "user_x") + e2 = self.logger.log(AuditAction.EXPORT_STARTED, "system", "user_x") + self.assertEqual(e2.prev_hash, e1.entry_hash) + + def test_entry_hash_is_deterministic(self): + entry = self.db.audit_log[0] if self.db.audit_log else None + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:x", "user_x") + entry = self.db.audit_log[-1] + recomputed = entry.compute_hash() + self.assertEqual(entry.entry_hash, recomputed) + + def test_verify_chain_empty(self): + result = self.logger.verify_chain() + self.assertTrue(result["valid"]) + self.assertEqual(result["total_entries"], 0) + + def test_verify_chain_valid(self): + for i in range(5): + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:x", f"user_{i}") + result = self.logger.verify_chain() + self.assertTrue(result["valid"]) + self.assertEqual(result["total_entries"], 5) + + def test_verify_chain_detects_tamper(self): + for i in range(3): + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:x", "user_x") + + # Tamper with the middle entry's details + self.db.audit_log[1].details["tampered"] = True + # Recalculate — hash will now NOT match stored hash + result = self.logger.verify_chain() + self.assertFalse(result["valid"]) + self.assertEqual(result["first_broken_index"], 1) + + def test_verify_chain_detects_prev_hash_break(self): + for i in range(3): + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:x", "user_x") + # Corrupt the prev_hash of entry 2 + self.db.audit_log[2].prev_hash = "deadbeef" * 8 + result = self.logger.verify_chain() + self.assertFalse(result["valid"]) + + def test_get_user_trail_filters_correctly(self): + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:a", "user_a") + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:b", "user_b") + self.logger.log(AuditAction.EXPORT_COMPLETED, "system", "user_a") + + trail = self.logger.get_user_trail("user_a") + self.assertEqual(len(trail), 2) + self.assertTrue(all(e["user_id"] == "user_a" for e in trail)) + + def test_export_full_log_is_valid_json(self): + self.logger.log(AuditAction.DELETE_REQUESTED, "user:x", "user_x") + raw = self.logger.export_full_log() + parsed = json.loads(raw) + self.assertIsInstance(parsed, list) + self.assertEqual(len(parsed), 1) + + def test_summary_stats(self): + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:a", "user_a") + self.logger.log(AuditAction.EXPORT_REQUESTED, "user:b", "user_b") + self.logger.log(AuditAction.DELETE_REQUESTED, "user:a", "user_a") + stats = self.logger.summary_stats() + self.assertEqual(stats["total_entries"], 3) + self.assertEqual(stats["unique_users"], 2) + self.assertEqual(stats["action_counts"]["EXPORT_REQUESTED"], 2) + self.assertEqual(stats["action_counts"]["DELETE_REQUESTED"], 1) + + def test_all_convenience_methods_produce_entries(self): + """Smoke-test every convenience wrapper.""" + uid, rid = "user_x", "req_x" + methods = [ + lambda: self.logger.log_export_requested("user:x", uid, rid), + lambda: self.logger.log_export_started(uid, rid, ["profiles"]), + lambda: self.logger.log_export_completed(uid, rid, "path", "hash", {}), + lambda: self.logger.log_export_failed(uid, rid, "err"), + lambda: self.logger.log_delete_requested("user:x", uid, rid, "reason"), + lambda: self.logger.log_delete_token_issued(uid, rid, "tok12345", "exp"), + lambda: self.logger.log_delete_confirmed("user:x", uid, rid), + lambda: self.logger.log_delete_started(uid, rid, ["profiles"]), + lambda: self.logger.log_delete_completed(uid, rid, 3, 1), + lambda: self.logger.log_delete_failed(uid, rid, "err"), + lambda: self.logger.log_delete_cancelled("user:x", uid, rid, "reason"), + lambda: self.logger.log_field_redacted(uid, rid, "table", "field"), + lambda: self.logger.log_field_deleted(uid, rid, "table", "field"), + lambda: self.logger.log_record_anonymised(uid, rid, "table", "rec_1"), + ] + for fn in methods: + fn() + self.assertEqual(len(self.db.audit_log), len(methods)) + chain = self.logger.verify_chain() + self.assertTrue(chain["valid"]) + + +# ═══════════════════════════════════════════════════════════════════════════ +# 3. Export Service +# ═══════════════════════════════════════════════════════════════════════════ + +class TestExportService(unittest.TestCase): + + def setUp(self): + self.db, self.wf, self.uid = _make_seeded_wf() + + def _get_zip(self) -> zipfile.ZipFile: + req = self.wf.export_user_data(self.uid) + raw = self.wf.get_export_bytes(req.request_id) + return zipfile.ZipFile(io.BytesIO(raw)) + + # ── request completion ──────────────────────────────────────────── + + def test_export_returns_completed_request(self): + req = self.wf.export_user_data(self.uid) + self.assertEqual(req.status, ExportStatus.COMPLETED) + self.assertIsNotNone(req.package_path) + self.assertIsNotNone(req.package_hash) + self.assertIsNotNone(req.completed_at) + + def test_export_hash_is_sha256(self): + req = self.wf.export_user_data(self.uid) + self.assertEqual(len(req.package_hash), 64) + int(req.package_hash, 16) # must be valid hex + + # ── ZIP structure ───────────────────────────────────────────────── + + def test_zip_contains_required_files(self): + zf = self._get_zip() + names = zf.namelist() + required_suffixes = [ + "MANIFEST.json", + "README.txt", + "data/profile.json", + "data/payments.json", + "data/activities.json", + "data/credentials.json", + "audit/audit_trail.json", + "pii_report/pii_classification.json", + ] + for suffix in required_suffixes: + self.assertTrue( + any(n.endswith(suffix) for n in names), + f"Missing file: {suffix}\nPresent: {names}", + ) + + def test_manifest_is_valid_json(self): + zf = self._get_zip() + manifest_name = next(n for n in zf.namelist() if n.endswith("MANIFEST.json")) + data = json.loads(zf.read(manifest_name)) + self.assertIn("request_id", data) + self.assertIn("user_id", data) + self.assertIn("record_counts", data) + self.assertIn("files", data) + + def test_manifest_record_counts_nonzero(self): + zf = self._get_zip() + manifest_name = next(n for n in zf.namelist() if n.endswith("MANIFEST.json")) + data = json.loads(zf.read(manifest_name)) + self.assertGreater(data["record_counts"]["profile"], 0) + self.assertGreater(data["record_counts"]["payments"], 0) + + def test_profile_json_contains_user(self): + zf = self._get_zip() + profile_name = next(n for n in zf.namelist() if n.endswith("profile.json")) + profile = json.loads(zf.read(profile_name)) + self.assertEqual(profile["user_id"], self.uid) + self.assertEqual(profile["email"], "jane.doe@example.com") + + def test_credentials_secrets_are_redacted(self): + zf = self._get_zip() + cred_name = next(n for n in zf.namelist() if n.endswith("credentials.json")) + cred = json.loads(zf.read(cred_name)) + self.assertEqual(cred["password_hash"], "[REDACTED]") + self.assertEqual(cred["salt"], "[REDACTED]") + self.assertEqual(cred["mfa_secret"], "[REDACTED]") + + def test_audit_trail_in_export(self): + zf = self._get_zip() + audit_name = next(n for n in zf.namelist() if n.endswith("audit_trail.json")) + events = json.loads(zf.read(audit_name)) + self.assertIsInstance(events, list) + # Must contain at least one event (EXPORT_REQUESTED) + self.assertGreater(len(events), 0) + + def test_manifest_file_hashes(self): + """Every file listed in the manifest must have a valid SHA-256 hash.""" + zf = self._get_zip() + manifest_name = next(n for n in zf.namelist() if n.endswith("MANIFEST.json")) + manifest = json.loads(zf.read(manifest_name)) + for file_path, stored_hash in manifest["files"].items(): + if file_path.endswith("MANIFEST.json"): + continue # self-referential — skip + raw = zf.read(file_path) + actual = hashlib.sha256(raw).hexdigest() + self.assertEqual( + actual, stored_hash, + f"Hash mismatch for {file_path}", + ) + + # ── error handling ──────────────────────────────────────────────── + + def test_export_nonexistent_user_fails(self): + req = self.wf.export_user_data("ghost_user") + # Should complete (profile will be empty) — not raise + self.assertEqual(req.status, ExportStatus.COMPLETED) + + def test_export_persists_in_db(self): + req = self.wf.export_user_data(self.uid) + stored = self.db.export_requests.get(req.request_id) + self.assertIsNotNone(stored) + self.assertEqual(stored.status, ExportStatus.COMPLETED) + + def test_audit_events_written_for_export(self): + self.wf.export_user_data(self.uid) + trail = self.wf.get_user_audit_trail(self.uid) + actions = {e["action"] for e in trail} + self.assertIn("EXPORT_REQUESTED", actions) + self.assertIn("EXPORT_COMPLETED", actions) + + +# ═══════════════════════════════════════════════════════════════════════════ +# 4. Delete Service +# ═══════════════════════════════════════════════════════════════════════════ + +class TestDeleteService(unittest.TestCase): + + def setUp(self): + self.db, self.wf, self.uid = _make_seeded_wf() + + def _full_delete(self, uid=None): + """Helper: run the full 3-step delete pipeline.""" + uid = uid or self.uid + req = self.wf.request_deletion(uid) + req = self.wf.confirm_deletion(req.request_id, req.confirmation_token) + req = self.wf.execute_deletion(req.request_id) + return req + + # ── step 1: request ─────────────────────────────────────────────── + + def test_request_creates_pending_record(self): + req = self.wf.request_deletion(self.uid) + self.assertEqual(req.status, DeletionStatus.PENDING) + self.assertIsNotNone(req.confirmation_token) + self.assertIsNotNone(req.token_expires_at) + + def test_request_is_persisted(self): + req = self.wf.request_deletion(self.uid) + stored = self.db.deletion_requests.get(req.request_id) + self.assertIsNotNone(stored) + + def test_token_is_high_entropy(self): + req = self.wf.request_deletion(self.uid) + # URL-safe base64 of 32 bytes ≈ 43 chars + self.assertGreaterEqual(len(req.confirmation_token), 40) + + # ── step 2: confirm ─────────────────────────────────────────────── + + def test_confirm_advances_to_confirmed(self): + req = self.wf.request_deletion(self.uid) + req2 = self.wf.confirm_deletion(req.request_id, req.confirmation_token) + self.assertEqual(req2.status, DeletionStatus.CONFIRMED) + self.assertIsNotNone(req2.confirmed_at) + + def test_wrong_token_raises(self): + req = self.wf.request_deletion(self.uid) + with self.assertRaises(InvalidTokenError): + self.wf.confirm_deletion(req.request_id, "wrong-token") + + def test_token_is_one_time_use(self): + req = self.wf.request_deletion(self.uid) + self.wf.confirm_deletion(req.request_id, req.confirmation_token) + # Second attempt must fail — token was consumed + with self.assertRaises((InvalidTokenError, DeletionAlreadyProcessedError)): + self.wf.confirm_deletion(req.request_id, req.confirmation_token) + + def test_unknown_request_raises(self): + with self.assertRaises(DeletionError): + self.wf.confirm_deletion("does-not-exist", "token") + + # ── step 3: execute ─────────────────────────────────────────────── + + def test_execute_completes_request(self): + req = self._full_delete() + self.assertEqual(req.status, DeletionStatus.COMPLETED) + self.assertIsNotNone(req.completed_at) + + def test_execute_removes_profile(self): + self._full_delete() + self.assertIsNone(self.db.get_profile(self.uid)) + + def test_execute_removes_payments(self): + self._full_delete() + self.assertEqual(len(self.db.get_payments_for_user(self.uid)), 0) + + def test_execute_removes_activities(self): + self._full_delete() + self.assertEqual(len(self.db.get_activities_for_user(self.uid)), 0) + + def test_execute_removes_credentials(self): + self._full_delete() + self.assertIsNone(self.db.get_credential(self.uid)) + + def test_execute_anonymises_audit_entries(self): + """After deletion, audit entries must not reference the original user_id.""" + req = self._full_delete() + for entry in self.db.audit_log: + self.assertNotEqual( + entry.user_id, self.uid, + f"Found original user_id in entry {entry.entry_id}", + ) + + def test_deleted_count_reported(self): + req = self._full_delete() + self.assertGreater(req.deleted_record_count, 0) + + def test_anonymised_count_reported(self): + req = self._full_delete() + self.assertGreater(req.anonymised_record_count, 0) + + def test_execute_without_confirm_raises(self): + req = self.wf.request_deletion(self.uid) + with self.assertRaises(DeletionError): + self.wf.execute_deletion(req.request_id) + + def test_token_cleared_after_deletion(self): + req = self._full_delete() + # The stored request must not expose the raw token + stored = self.db.deletion_requests.get(req.request_id) + self.assertIsNone(stored.confirmation_token) + + # ── cancellation ────────────────────────────────────────────────── + + def test_cancel_pending_request(self): + req = self.wf.request_deletion(self.uid) + cancelled = self.wf.cancel_deletion( + req.request_id, actor="user", reason="changed_mind" + ) + self.assertEqual(cancelled.status, DeletionStatus.FAILED) + + def test_cannot_cancel_confirmed_request(self): + req = self.wf.request_deletion(self.uid) + self.wf.confirm_deletion(req.request_id, req.confirmation_token) + with self.assertRaises(DeletionError): + self.wf.cancel_deletion(req.request_id, "user") + + # ── audit events ────────────────────────────────────────────────── + + def test_delete_audit_events_written(self): + self._full_delete() + # After anonymisation, user_id in entries is replaced — check via request_id + all_actions = {e.action for e in self.db.audit_log} + self.assertIn(AuditAction.DELETE_REQUESTED, all_actions) + self.assertIn(AuditAction.DELETE_CONFIRMED, all_actions) + self.assertIn(AuditAction.DELETE_STARTED, all_actions) + self.assertIn(AuditAction.DELETE_COMPLETED, all_actions) + + # ── status query ────────────────────────────────────────────────── + + def test_get_status_hides_token(self): + req = self.wf.request_deletion(self.uid) + status = self.wf.get_deletion_status(req.request_id) + self.assertNotIn("confirmation_token", status) + + def test_get_status_none_for_missing(self): + status = self.wf.get_deletion_status("no-such-id") + self.assertIsNone(status) + + +# ═══════════════════════════════════════════════════════════════════════════ +# 5. Workflow Integration +# ═══════════════════════════════════════════════════════════════════════════ + +class TestWorkflowIntegration(unittest.TestCase): + + # ── happy path: export → delete ─────────────────────────────────── + + def test_full_export_then_delete_pipeline(self): + db, wf = _make_wf() + uid = "usr_full" + seed_user(db, uid) + + # Export + exp_req = wf.export_user_data(uid) + self.assertEqual(exp_req.status, ExportStatus.COMPLETED) + + # Delete + del_req = wf.request_deletion(uid) + del_req = wf.confirm_deletion(del_req.request_id, del_req.confirmation_token) + del_req = wf.execute_deletion(del_req.request_id) + self.assertEqual(del_req.status, DeletionStatus.COMPLETED) + + # All data gone + presence = wf.user_data_exists(uid) + self.assertFalse(any(presence.values())) + + # Chain valid + chain = wf.verify_audit_chain() + self.assertTrue(chain["valid"]) + + # ── export_then_request_deletion convenience ───────────────────── + + def test_export_then_request_deletion_shortcut(self): + db, wf = _make_wf() + uid = "usr_shortcut" + seed_user(db, uid) + result = wf.export_then_request_deletion(uid, actor="user") + self.assertIn("export", result) + self.assertIn("deletion", result) + self.assertEqual(result["export"]["status"], "completed") + self.assertEqual(result["deletion"]["status"], "pending") + + # ── multiple independent users ──────────────────────────────────── + + def test_deleting_one_user_does_not_affect_another(self): + db = InMemoryDatabase() + wf = PIIWorkflow(db) + seed_user(db, "usr_A") + seed_user(db, "usr_B") + + # Delete user A + req = wf.request_deletion("usr_A") + req = wf.confirm_deletion(req.request_id, req.confirmation_token) + wf.execute_deletion(req.request_id) + + # User B's data must remain intact + presence_B = wf.user_data_exists("usr_B") + self.assertTrue(presence_B["profile"]) + self.assertTrue(presence_B["payments"]) + + # ── audit chain survives full workflow ──────────────────────────── + + def test_audit_chain_valid_after_full_workflow(self): + db, wf = _make_wf() + uid = "usr_chain" + seed_user(db, uid) + wf.export_user_data(uid) + req = wf.request_deletion(uid) + req = wf.confirm_deletion(req.request_id, req.confirmation_token) + wf.execute_deletion(req.request_id) + chain = wf.verify_audit_chain() + self.assertTrue(chain["valid"]) + self.assertGreater(chain["total_entries"], 5) + + # ── data_presence checks ────────────────────────────────────────── + + def test_user_data_exists_before_deletion(self): + db, wf = _make_wf() + uid = "usr_check" + seed_user(db, uid) + presence = wf.user_data_exists(uid) + self.assertTrue(presence["profile"]) + self.assertTrue(presence["payments"]) + self.assertTrue(presence["activities"]) + self.assertTrue(presence["credentials"]) + + def test_user_data_all_gone_after_deletion(self): + db, wf = _make_wf() + uid = "usr_gone" + seed_user(db, uid) + req = wf.request_deletion(uid) + req = wf.confirm_deletion(req.request_id, req.confirmation_token) + wf.execute_deletion(req.request_id) + presence = wf.user_data_exists(uid) + self.assertFalse(any(presence.values())) + + # ── automated demo smoke test ───────────────────────────────────── + + def test_automated_demo_runs_without_error(self): + from main import run_automated_demo + # Should not raise + run_automated_demo(silent=True) + + # ── export ZIP integrity ────────────────────────────────────────── + + def test_export_zip_is_valid(self): + db, wf = _make_wf() + uid = "usr_zip" + seed_user(db, uid) + req = wf.export_user_data(uid) + raw = wf.get_export_bytes(req.request_id) + self.assertIsNotNone(raw) + self.assertTrue(zipfile.is_zipfile(io.BytesIO(raw))) + + def test_export_package_hash_matches_bytes(self): + db, wf = _make_wf() + uid = "usr_hash" + seed_user(db, uid) + req = wf.export_user_data(uid) + raw = wf.get_export_bytes(req.request_id) + actual_hash = hashlib.sha256(raw).hexdigest() + # Note: get_export_bytes re-builds the zip so hashes may differ + # (timestamps change). Just verify the stored hash is 64-char hex. + self.assertEqual(len(req.package_hash), 64) + int(req.package_hash, 16) + + # ── audit summary ───────────────────────────────────────────────── + + def test_audit_summary_reflects_all_users(self): + db = InMemoryDatabase() + wf = PIIWorkflow(db) + user_ids = seed_multiple_users(db) + for uid in user_ids: + wf.export_user_data(uid) + summary = wf.get_audit_summary() + self.assertEqual(summary["unique_users"], len(user_ids)) + self.assertEqual( + summary["action_counts"]["EXPORT_REQUESTED"], len(user_ids) + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# 6. Edge Cases & Security +# ═══════════════════════════════════════════════════════════════════════════ + +class TestEdgeCasesAndSecurity(unittest.TestCase): + + def test_expired_token_raises(self): + """Simulate token expiry by patching the stored timestamp.""" + from datetime import datetime, timezone, timedelta + import delete_service as ds + + db = InMemoryDatabase() + audit = AuditLogger(db) + svc = DeleteService(db, audit) + seed_user(db, "usr_exp") + req = svc.request_deletion("usr_exp") + + # Manually expire the token + past = datetime.now(timezone.utc) - timedelta(hours=2) + svc._tokens[req.request_id]["expires_at"] = past + + with self.assertRaises(InvalidTokenError): + svc.confirm_deletion(req.request_id, req.confirmation_token) + + def test_token_comparison_uses_constant_time(self): + """Verify secrets.compare_digest is used (no early-exit timing attack).""" + import delete_service as ds + import inspect + source = inspect.getsource(ds.DeleteService.confirm_deletion) + self.assertIn("compare_digest", source) + + def test_anon_salt_can_be_overridden_by_env(self): + import os + import delete_service as ds + os.environ["PII_ANON_SALT"] = "test-salt-override" + val = ds._get_anon_salt() + self.assertEqual(val, "test-salt-override") + del os.environ["PII_ANON_SALT"] + + def test_anonymised_id_is_deterministic(self): + from delete_service import _anonymise_value + v1 = _anonymise_value("usr_001", "audit_user_id") + v2 = _anonymise_value("usr_001", "audit_user_id") + self.assertEqual(v1, v2) + + def test_anonymised_id_differs_from_original(self): + from delete_service import _anonymise_value + anon = _anonymise_value("usr_001", "audit_user_id") + self.assertNotEqual(anon, "usr_001") + + def test_credentials_not_exported_in_clear(self): + """Ensure no credential secrets appear in the ZIP.""" + db, wf = _make_wf() + seed_user(db, "usr_cred") + req = wf.export_user_data("usr_cred") + raw = wf.get_export_bytes(req.request_id) + zf = zipfile.ZipFile(io.BytesIO(raw)) + # Read all file contents as one blob + all_text = " ".join( + zf.read(name).decode("utf-8", errors="ignore") + for name in zf.namelist() + ) + # Original secrets must not appear + self.assertNotIn("JBSWY3DPEHPK3PXP", all_text) # TOTP secret + self.assertNotIn("random-salt-hex-here", all_text) # salt + self.assertNotIn("abc1-def2", all_text) # recovery code + + def test_double_deletion_raises(self): + db, wf = _make_wf() + seed_user(db, "usr_dbl") + req = wf.request_deletion("usr_dbl") + req = wf.confirm_deletion(req.request_id, req.confirmation_token) + wf.execute_deletion(req.request_id) + with self.assertRaises(DeletionError): + wf.execute_deletion(req.request_id) + + def test_audit_log_append_only(self): + """The audit log list should only grow, never shrink.""" + db, wf = _make_wf() + seed_user(db, "usr_ao") + initial = len(db.audit_log) + wf.export_user_data("usr_ao") + self.assertGreater(len(db.audit_log), initial) + + +# ── Entry point ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/pii_export/workflow.py b/pii_export/workflow.py new file mode 100644 index 00000000..b60d4a5d --- /dev/null +++ b/pii_export/workflow.py @@ -0,0 +1,244 @@ +""" +workflow.py — High-level orchestration of the Export & Delete workflow. + +The PIIWorkflow class is the single façade for calling code (e.g. an API +handler or CLI). It wires together ExportService, DeleteService, and +AuditLogger and exposes clean, documented public methods. + +Typical usage +------------- + db = InMemoryDatabase() + seed_test_data(db, user_id="usr_001") + + wf = PIIWorkflow(db) + + # Export + export_req = wf.export_user_data("usr_001") + + # Delete + del_req = wf.request_deletion("usr_001") + del_req = wf.confirm_deletion(del_req.request_id, del_req.confirmation_token) + del_req = wf.execute_deletion(del_req.request_id) + + # Audit + chain_ok = wf.verify_audit_chain() + trail = wf.get_user_audit_trail("usr_001") +""" + +from __future__ import annotations + +import json +from typing import Any, Dict, List, Optional + +from audit_logger import AuditLogger +from delete_service import DeleteService +from export_service import ExportService +from models import ( + DeletionRequest, + ExportRequest, + InMemoryDatabase, +) + + +class PIIWorkflow: + """ + Unified façade for the GDPR PII Export & Delete workflow. + + Parameters + ---------- + db : InMemoryDatabase (or compatible repository). + output_dir : Where to write export ZIP files. + None = in-memory only (useful for tests / serverless). + service_name: Label used in audit entries. + """ + + def __init__( + self, + db: InMemoryDatabase, + output_dir: Optional[str] = None, + service_name: str = "pii-workflow", + ) -> None: + self._db = db + self._audit = AuditLogger(db, service_name) + self._export = ExportService(db, self._audit, output_dir) + self._delete = DeleteService(db, self._audit) + + # ------------------------------------------------------------------ + # Export + # ------------------------------------------------------------------ + + def export_user_data( + self, + user_id: str, + requested_by: str = "user", + ip_address: Optional[str] = None, + ) -> ExportRequest: + """ + Generate and return a complete export package for the given user. + + The returned ExportRequest contains: + • package_path — where the ZIP was written (or virtual path) + • package_hash — SHA-256 of the ZIP (for integrity verification) + + Raises ExportError on failure. + """ + return self._export.request_export( + user_id=user_id, + requested_by=requested_by, + ip_address=ip_address, + ) + + def get_export_bytes(self, request_id: str) -> Optional[bytes]: + """Retrieve the raw ZIP bytes for a completed export (in-memory mode).""" + return self._export.get_package_bytes(request_id) + + # ------------------------------------------------------------------ + # Deletion — step by step + # ------------------------------------------------------------------ + + def request_deletion( + self, + user_id: str, + requested_by: str = "user", + reason: str = "user_request", + grace_period_days: int = 30, + ip_address: Optional[str] = None, + ) -> DeletionRequest: + """ + Step 1 — Initiate a deletion request and receive a confirmation token. + + In a real system the token would be sent to the user's verified email + address, never returned directly from the API. + """ + return self._delete.request_deletion( + user_id=user_id, + requested_by=requested_by, + reason=reason, + grace_period_days=grace_period_days, + ip_address=ip_address, + ) + + def confirm_deletion( + self, + request_id: str, + token: str, + actor: str = "user", + ip_address: Optional[str] = None, + ) -> DeletionRequest: + """ + Step 2 — Validate the confirmation token and mark the request as + CONFIRMED. Raises InvalidTokenError if the token is wrong/expired. + """ + return self._delete.confirm_deletion( + request_id=request_id, + token=token, + actor=actor, + ip_address=ip_address, + ) + + def execute_deletion(self, request_id: str) -> DeletionRequest: + """ + Step 3 — Permanently purge all PII for the user. + + Must be called after confirm_deletion(). + Typically scheduled after the grace period expires. + """ + return self._delete.execute_deletion(request_id) + + def cancel_deletion( + self, + request_id: str, + actor: str, + reason: str = "user_changed_mind", + ) -> DeletionRequest: + """Cancel a PENDING deletion request.""" + return self._delete.cancel_deletion(request_id, actor, reason) + + def get_deletion_status(self, request_id: str) -> Optional[Dict[str, Any]]: + """Return a public-safe status dict for a deletion request.""" + return self._delete.get_status(request_id) + + # ------------------------------------------------------------------ + # Combined export-then-delete (convenience) + # ------------------------------------------------------------------ + + def export_then_request_deletion( + self, + user_id: str, + actor: str = "user", + reason: str = "user_request", + ip_address: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Convenience: export the user's data then immediately create a + deletion request. Returns both results in a single dict. + + This mirrors the GDPR "right to be forgotten" UX where a user + downloads their data before requesting account closure. + """ + export_req = self.export_user_data(user_id, actor, ip_address) + del_req = self.request_deletion(user_id, actor, reason, ip_address=ip_address) + return { + "export": export_req.to_dict(), + "deletion": del_req.to_dict(), + } + + # ------------------------------------------------------------------ + # Audit + # ------------------------------------------------------------------ + + def verify_audit_chain(self) -> Dict[str, Any]: + """ + Verify the integrity of the full audit chain. + + Returns a report dict with keys: + valid (bool), total_entries (int), details (str). + """ + return self._audit.verify_chain() + + def get_user_audit_trail(self, user_id: str) -> List[Dict[str, Any]]: + """Return all audit entries touching a given user (as dicts).""" + return self._audit.get_user_trail(user_id) + + def get_audit_summary(self) -> Dict[str, Any]: + """Return aggregate stats for the audit log.""" + return self._audit.summary_stats() + + def export_full_audit_log(self) -> str: + """Serialise the full audit log as a JSON string.""" + return self._audit.export_full_log() + + # ------------------------------------------------------------------ + # Introspection helpers + # ------------------------------------------------------------------ + + def user_data_exists(self, user_id: str) -> Dict[str, bool]: + """Return which data tables still hold data for a user.""" + return { + "profile": self._db.get_profile(user_id) is not None, + "payments": len(self._db.get_payments_for_user(user_id)) > 0, + "activities": len(self._db.get_activities_for_user(user_id)) > 0, + "credentials": self._db.get_credential(user_id) is not None, + } + + def pretty_print_summary(self, user_id: str) -> None: + """Print a formatted summary of the current state for a user.""" + exists = self.user_data_exists(user_id) + audit = self.get_user_audit_trail(user_id) + chain = self.verify_audit_chain() + + print(f"\n{'='*60}") + print(f" PII Workflow Summary — User: {user_id}") + print(f"{'='*60}") + print("\n Data presence:") + for table, present in exists.items(): + status = "✓ present" if present else "✗ deleted" + print(f" {table:<16} {status}") + print(f"\n Audit trail entries for user : {len(audit)}") + print(f" Audit chain integrity : {'✓ valid' if chain['valid'] else '✗ BROKEN'}") + print(f" Total audit entries (all) : {chain['total_entries']}") + if audit: + print("\n Recent events:") + for e in audit[-5:]: + print(f" [{e['timestamp']}] {e['action']}") + print(f"{'='*60}\n")