Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/providers/freesat.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from typing import List, Dict, Any

from ..utils.parsing import parse_duration_value, parse_timestamp
from .base import Context


Expand Down Expand Up @@ -88,14 +89,16 @@ def fetch_programmes(channel: Dict[str, Any], ctx: Context) -> List[Dict[str, An
for item in epg_data:
title = item.get("name")
desc = item.get("description")
start = item.get("startTime")
duration = item.get("duration", 0)
if start is None:
start_raw = item.get("startTime")
duration_raw = item.get("duration", 0)
if start_raw is None:
continue
try:
start = parse_timestamp(start_raw)
duration = parse_duration_value(duration_raw)
end = start + duration
except Exception:
end = start
continue
icon = None
if item.get("image"):
icon = (
Expand Down
10 changes: 4 additions & 6 deletions src/providers/freeview.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Any, Tuple, Optional

from ..xmltv import parse_duration
from ..utils.parsing import parse_duration_value, parse_timestamp
from .base import Context


Expand Down Expand Up @@ -71,13 +71,11 @@ def fetch_programmes(channel: Dict[str, Any], ctx: Context) -> List[Dict[str, An
if not start_time_str or not duration_str:
continue
try:
start_dt = datetime.strptime(start_time_str, "%Y-%m-%dT%H:%M:%S%z")
duration_td = parse_duration(duration_str)
start_ts = parse_timestamp(start_time_str)
duration_seconds = parse_duration_value(duration_str)
except Exception:
continue
end_dt = start_dt + duration_td
start_ts = start_dt.timestamp()
end_ts = end_dt.timestamp()
end_ts = start_ts + duration_seconds
# Build a cache key for the detail request
service_id = service.get("service_id")
program_id = listing.get("program_id")
Expand Down
6 changes: 4 additions & 2 deletions src/providers/sky.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime, timedelta
from typing import List, Dict, Any

from ..utils.parsing import parse_duration_value, parse_timestamp
from .base import Context


Expand Down Expand Up @@ -52,8 +53,9 @@ def fetch_programmes(channel: Dict[str, Any], ctx: Context) -> List[Dict[str, An
if start_raw is None or duration_raw is None:
continue
try:
start = int(start_raw)
end = start + int(duration_raw)
start = parse_timestamp(start_raw)
duration = parse_duration_value(duration_raw)
end = start + duration
except Exception:
continue
# Determine the best available icon based on identifiers
Expand Down
5 changes: 5 additions & 0 deletions src/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Utility helpers for providers and core modules."""

from .parsing import parse_duration_value, parse_timestamp, pick_first_text

__all__ = ["parse_duration_value", "parse_timestamp", "pick_first_text"]
109 changes: 109 additions & 0 deletions src/utils/parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Shared parsing helpers for provider data."""

from __future__ import annotations

import re
from datetime import datetime, timedelta, timezone
from typing import Iterable, Optional, Union


_EPOCH_MILLIS_THRESHOLD = 10**11


def parse_timestamp(value: Union[str, int, float, datetime]) -> int:
"""Parse a timestamp from ISO 8601 strings or epoch seconds/millis.

Args:
value: ISO 8601 string, epoch seconds, epoch millis, or datetime.

Returns:
The timestamp in epoch seconds.
"""
if value is None:
raise ValueError("timestamp value is required")
if isinstance(value, datetime):
dt = value
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp())
if isinstance(value, (int, float)):
return _parse_epoch_number(value)
if isinstance(value, str):
text = value.strip()
if not text:
raise ValueError("timestamp value is empty")
if re.fullmatch(r"-?\d+(?:\.\d+)?", text):
return _parse_epoch_number(float(text))
dt = _parse_iso_datetime(text)
return int(dt.timestamp())
raise TypeError(f"unsupported timestamp type: {type(value)!r}")


def parse_duration_value(value: Union[str, int, float, timedelta]) -> int:
"""Parse a duration from seconds or ISO 8601 strings.

Args:
value: Duration in seconds or ISO 8601 duration string.

Returns:
Duration in seconds.
"""
if value is None:
raise ValueError("duration value is required")
if isinstance(value, timedelta):
return int(value.total_seconds())
if isinstance(value, (int, float)):
return int(value)
if isinstance(value, str):
text = value.strip()
if not text:
raise ValueError("duration value is empty")
if re.fullmatch(r"-?\d+(?:\.\d+)?", text):
return int(float(text))
if text.upper().startswith("P"):
return _parse_iso_duration_seconds(text)
raise ValueError(f"invalid duration value: {text}")
raise TypeError(f"unsupported duration type: {type(value)!r}")


def pick_first_text(values: Iterable[Optional[str]]) -> Optional[str]:
"""Return the first non-empty string from the iterable."""
for value in values:
if isinstance(value, str) and value.strip():
return value
return None


def _parse_epoch_number(value: Union[int, float]) -> int:
"""Convert epoch seconds or milliseconds to epoch seconds."""
if abs(value) >= _EPOCH_MILLIS_THRESHOLD:
return int(value / 1000)
return int(value)


def _parse_iso_datetime(text: str) -> datetime:
"""Parse an ISO 8601 timestamp string into a timezone-aware datetime."""
normalized = text.replace("Z", "+00:00")
try:
dt = datetime.fromisoformat(normalized)
except ValueError:
dt = datetime.strptime(text, "%Y-%m-%dT%H:%M:%S%z")
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt


def _parse_iso_duration_seconds(iso_duration: str) -> int:
"""Parse an ISO 8601 duration string into seconds."""
match = re.match(
r"^P(?:(\d+)Y)?(?:(\d+)M)?(?:(\d+)D)?T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+(?:\.\d+)?)S)?$",
iso_duration,
)
if match is None:
raise ValueError(f"invalid ISO 8601 duration string: {iso_duration}")
days = int(match.group(3) or 0)
hours = int(match.group(4) or 0)
minutes = int(match.group(5) or 0)
seconds = float(match.group(6) or 0.0)
duration = timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
return int(duration.total_seconds())