diff --git a/README.md b/README.md index d3227a4..e1f9200 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ The following price sources are available: | Coincap | `beanprice.coincap` | [Most common (crypto)currencies](https://docs.coincap.io) | USD | ✓ | ✓ | | Coinmarketcap | `beanprice.coinmarketcap` | [Most common (crypto)currencies](https://coinmarketcap.com/api/documentation/v1/) | Many Currencies | ✓ | ✕ | | European Central Bank API| `beanprice.ecbrates` | [Many currencies](https://data.ecb.europa.eu/search-results?searchTerm=exchange%20rates) | [Many currencies](https://data.ecb.europa.eu/search-results?searchTerm=exchange%20rates) (Derived from EUR rates)| ✓ | ✓ | +| Financial Times | `beanprice.ft` | [Stocks](https://markets.ft.com/) | Many currencies | ✓ | ✓ | | OANDA | `beanprice.oanda` | [Many currencies](https://developer.oanda.com/exchange-rates-api/v1/currencies/) | [Many currencies](https://developer.oanda.com/exchange-rates-api/v1/currencies/) | ✓ | ✓ | | Quandl | `beanprice.quandl` | [Various datasets](https://www.quandl.com/search) | [Various datasets](https://www.quandl.com/search) | ✓ | ✓ | | Rates API | `beanprice.ratesapi` | [Many currencies](https://api.exchangerate.host/symbols) | [Many currencies](https://api.exchangerate.host/symbols) | ✓ | ✓ | diff --git a/beanprice/sources/ft.py b/beanprice/sources/ft.py new file mode 100644 index 0000000..064430b --- /dev/null +++ b/beanprice/sources/ft.py @@ -0,0 +1,197 @@ +"""Fetch prices from markets.ft.com. + +This source uses Financial Times JSON API to fetch prices. +It requires an internal FT ID (xid) which is extracted from the tearsheet page. +""" + +__copyright__ = "Copyright (C) 2026 Roman Medvedev" +__license__ = "GNU GPLv2" + +import datetime +from decimal import Decimal +import json +import re +import urllib.request +import urllib.parse +import urllib.error +from typing import List, Optional, Dict, Any + +from beanprice import source + +class FTError(ValueError): + """An error from the FT source.""" + +_USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36" +) + +def get_url(url: str, params: Optional[Dict[str, Any]] = None) -> str: + """Fetch content from a URL using urllib.""" + if params: + url = f"{url}?{urllib.parse.urlencode(params)}" + + headers = { + "User-Agent": _USER_AGENT, + "Accept": "application/json, text/plain, */*", + } + req = urllib.request.Request(url, headers=headers) + try: + with urllib.request.urlopen(req, timeout=10) as response: + return response.read().decode('utf-8') + except urllib.error.URLError as exc: + raise FTError(f"Network error fetching {url}: {exc}") from exc + +def post_json(url: str, data: Dict[str, Any]) -> str: + """Post JSON data to a URL using urllib.""" + headers = { + "User-Agent": _USER_AGENT, + "Content-Type": "application/json", + "Accept": "application/json, text/plain, */*", + } + body = json.dumps(data).encode('utf-8') + req = urllib.request.Request(url, data=body, headers=headers, method='POST') + try: + with urllib.request.urlopen(req, timeout=10) as response: + return response.read().decode('utf-8') + except urllib.error.URLError as exc: + raise FTError(f"Network error posting to {url}: {exc}") from exc + +class Source(source.Source): + """Financial Times price extractor.""" + + def __init__(self): + self._xid_cache = {} + + def _get_xid(self, ticker: str) -> str: + """Get the internal FT ID (xid) for a ticker.""" + if ticker in self._xid_cache: + return self._xid_cache[ticker] + + url = f"https://markets.ft.com/data/equities/tearsheet/summary?s={ticker}" + content = get_url(url) + + # Look for xid followed by colon/equals, optional quotes, and digits + pattern = ( + r'(?:xid|"xid")\s*[:=]\s*' + r'(?:["\']|")?(\d+)(?:["\']|")?' + ) + match = re.search(pattern, content) + if not match: + raise FTError(f"Could not determine internal FT ID for ticker {ticker}") + + xid = match.group(1) + self._xid_cache[ticker] = xid + return xid + + def _fetch_history(self, ticker: str, days: int) -> List[Dict[str, Any]]: + """Fetch historical series from FT.""" + xid = self._get_xid(ticker) + url = "https://markets.ft.com/data/chartapi/series" + + payload = { + "days": days, + "dataNormalized": False, + "dataPeriod": "Day", + "dataInterval": 1, + "realtime": False, + "yFormat": "0.###", + "timeServiceFormat": "JSON", + "returnDateType": "ISO8601", + "elements": [ + {"Type": "price", "Symbol": xid} + ] + } + + response_text = post_json(url, payload) + try: + data = json.loads(response_text) + except json.JSONDecodeError as exc: + raise FTError(f"Invalid JSON response from FT for {ticker}: {exc}") from exc + + dates = data.get("Dates", []) + elements = data.get("Elements", []) + if not dates or not elements: + return [] + + price_comp = next((e for e in elements if e.get("Type") == "price"), None) + if not price_comp: + return [] + + ohlc = price_comp.get("ComponentSeries", []) + # Provide default empty list if "Values" missing + closes: List[Any] = next( + (s.get("Values", []) for s in ohlc if s.get("Type") == "Close"), [] + ) + + history = [] + for i, d_str in enumerate(dates): + if i < len(closes) and closes[i] is not None: + # FT dates are like "2025-12-29T00:00:00" + try: + date_time = datetime.datetime.fromisoformat(d_str).replace( + tzinfo=datetime.timezone.utc + ) + history.append({ + "time": date_time, + "price": Decimal(str(closes[i])) + }) + except ValueError: + continue # Skip invalid dates + return history + + def get_latest_price(self, ticker: str) -> Optional[source.SourcePrice]: + """Fetch the current latest price.""" + try: + # Fetch last 5 days to ensure we get a price (weekends) + history = self._fetch_history(ticker, 5) + if not history: + return None + + latest = history[-1] + return source.SourcePrice(latest["price"], latest["time"], None) + except FTError as error: + raise FTError("%s (ticker: %s)" % (error, ticker)) from error + except Exception as exc: + raise FTError("Unexpected error for ticker %s: %s" % (ticker, exc)) from exc + + def get_historical_price( + self, ticker: str, time: datetime.datetime + ) -> Optional[source.SourcePrice]: + """Return the latest historical price found for the symbol at the given date.""" + try: + # Normalize 'time' to be timezone-aware (UTC) if it isn't already. + if time.tzinfo is None: + time = time.replace(tzinfo=datetime.timezone.utc) + + # API 'days' is lookback from today. + days_diff = (datetime.datetime.now(datetime.timezone.utc) - time).days + 7 + days_diff = max(days_diff, 1) + + # Fetch history + history = self._fetch_history(ticker, max(days_diff, 30)) + if not history: + return None + + # Sort history just in case API returns unsorted (unlikely but safe) + history.sort(key=lambda x: x["time"]) + + best_match = {} + for entry in reversed(history): + if entry["time"] <= time: + best_match = entry + break + + if best_match: + # Check if it's within a reasonable range (e.g. 7 days as in ft.py base) + if (time - best_match["time"]).days <= 7: + return source.SourcePrice(best_match["price"], best_match["time"], None) + + return None + except FTError as error: + raise FTError("%s (ticker: %s)" % (error, ticker)) from error + except Exception as exc: + raise FTError( + "Unexpected error for ticker %s at %s: %s" % (ticker, time, exc) + ) from exc diff --git a/beanprice/sources/ft_test.py b/beanprice/sources/ft_test.py new file mode 100644 index 0000000..edc6d9c --- /dev/null +++ b/beanprice/sources/ft_test.py @@ -0,0 +1,134 @@ + +__copyright__ = "Copyright (C) 2026 Roman Medvedev" +__license__ = "GNU GPLv2" + +import datetime +from decimal import Decimal +import unittest +from unittest import mock + +from beanprice.sources import ft + +class TestFTSource(unittest.TestCase): + def setUp(self): + self.source = ft.Source() + + @mock.patch('beanprice.sources.ft.get_url') + @mock.patch('beanprice.sources.ft.post_json') + def test_get_latest_price(self, mock_post, mock_get): + # Mock _get_xid + mock_get.return_value = 'var xid = "123456";' + + # Mock _fetch_history response + mock_post.return_value = """ + { + "Dates": ["2025-12-29T00:00:00"], + "Elements": [ + { + "Type": "price", + "ComponentSeries": [ + { + "Type": "Close", + "Values": [123.45] + } + ] + } + ] + } + """ + + price = self.source.get_latest_price("TEST:EXCH") + + self.assertIsNotNone(price) + self.assertEqual(price.price, Decimal("123.45")) + self.assertEqual( + price.time, + datetime.datetime(2025, 12, 29, 0, 0, 0, tzinfo=datetime.timezone.utc) + ) + + @mock.patch('beanprice.sources.ft.get_url') + @mock.patch('beanprice.sources.ft.post_json') + def test_get_historical_price_naive(self, mock_post, mock_get): + """Test get_historical_price with a naive datetime.""" + mock_get.return_value = 'var xid = "123456";' + + # Return a date slightly before the requested date + mock_post.return_value = """ + { + "Dates": ["2025-12-28T00:00:00"], + "Elements": [ + { + "Type": "price", + "ComponentSeries": [ + { + "Type": "Close", + "Values": [120.00] + } + ] + } + ] + } + """ + + # Requesting 2025-12-29 (Naive) + time_naive = datetime.datetime(2025, 12, 29) + price = self.source.get_historical_price("TEST:EXCH", time_naive) + + self.assertIsNotNone(price) + self.assertEqual(price.price, Decimal("120.00")) + # Result should be aware (UTC) + self.assertEqual( + price.time, + datetime.datetime(2025, 12, 28, 0, 0, 0, tzinfo=datetime.timezone.utc) + ) + + @mock.patch('beanprice.sources.ft.get_url') + @mock.patch('beanprice.sources.ft.post_json') + def test_get_historical_price_aware(self, mock_post, mock_get): + """Test get_historical_price with an aware datetime.""" + mock_get.return_value = 'var xid = "123456";' + + # Return a date slightly before the requested date + mock_post.return_value = """ + { + "Dates": ["2025-12-28T00:00:00"], + "Elements": [ + { + "Type": "price", + "ComponentSeries": [ + { + "Type": "Close", + "Values": [120.00] + } + ] + } + ] + } + """ + + # Requesting 2025-12-29 (Aware UTC) + time_aware = datetime.datetime(2025, 12, 29, tzinfo=datetime.timezone.utc) + price = self.source.get_historical_price("TEST:EXCH", time_aware) + + self.assertIsNotNone(price) + self.assertEqual(price.price, Decimal("120.00")) + + @mock.patch('beanprice.sources.ft.get_url') + def test_get_xid_regex_variations(self, mock_get): + """Test different variations of XID format in tearsheet.""" + # Standard + mock_get.return_value = 'data-mod-config="{xid: \'123456\'}"' + self.assertEqual(self.source._get_xid("A"), "123456") + + # With quotes + self.source._xid_cache.clear() + mock_get.return_value = 'xid="654321"' + self.assertEqual(self.source._get_xid("B"), "654321") + + # HTML encoded + self.source._xid_cache.clear() + mock_get.return_value = '"xid":"987654"' + self.assertEqual(self.source._get_xid("C"), "987654") + +if __name__ == '__main__': + unittest.main()