Skip to content

Commit 822127c

Browse files
authored
DISCO-3404 Resource optimization for airflow job (#861)
* DISCO-3404 Optimize navigational suggestion job for better resource management * DISCO-3404 Improve memory handling while processing favicons in nav-suggestion job
1 parent b58274c commit 822127c

File tree

2 files changed

+85
-38
lines changed

2 files changed

+85
-38
lines changed

merino/jobs/navigational_suggestions/domain_metadata_extractor.py

+69-35
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ class DomainMetadataExtractor:
176176
"Service unavailable",
177177
]
178178

179+
# Constants for favicon URL validation
180+
MANIFEST_JSON_BASE64_MARKER = "/application/manifest+json;base64,"
181+
179182
# List of blocked (second level) domains
180183
blocked_domains: set[str]
181184
scraper: Scraper
@@ -225,15 +228,15 @@ async def _extract_favicons(self, scraped_url: str) -> list[dict[str, Any]]:
225228

226229
for favicon in favicon_data.links:
227230
favicon_url = favicon["href"]
228-
if favicon_url.startswith("data:"):
231+
if self._is_problematic_favicon_url(favicon_url):
229232
continue
230233
if not favicon_url.startswith("http") and not favicon_url.startswith("//"):
231234
favicon["href"] = urljoin(scraped_url, favicon_url)
232235
favicons.append(favicon)
233236

234237
for favicon in favicon_data.metas:
235238
favicon_url = favicon["content"]
236-
if favicon_url.startswith("data:"):
239+
if self._is_problematic_favicon_url(favicon_url):
237240
continue
238241
if not favicon_url.startswith("http") and not favicon_url.startswith("//"):
239242
favicon["href"] = urljoin(scraped_url, favicon_url)
@@ -254,8 +257,8 @@ async def _extract_favicons(self, scraped_url: str) -> list[dict[str, Any]]:
254257
manifest_urls.append(manifest_absolute_url)
255258

256259
if manifest_tasks:
257-
# Use smaller chunk size for manifest tasks to limit resource usage
258-
chunk_size = 10
260+
# Reduce chunk size for manifest tasks to limit resource consumption
261+
chunk_size = 5
259262
for i in range(0, len(manifest_tasks), chunk_size):
260263
chunk = manifest_tasks[i : i + chunk_size]
261264
chunk_urls = manifest_urls[i : i + chunk_size]
@@ -280,6 +283,11 @@ async def _extract_favicons(self, scraped_url: str) -> list[dict[str, Any]]:
280283
for scraped_favicon in scraped_favicons_result:
281284
# Check if the favicon URL already contains a scheme
282285
favicon_src = scraped_favicon.get("src", "")
286+
287+
# Skip problematic data URLs or invalid formats
288+
if self._is_problematic_favicon_url(favicon_src):
289+
continue
290+
283291
if favicon_src.startswith(("http://", "https://")):
284292
favicon_url = favicon_src
285293
else:
@@ -305,39 +313,47 @@ async def _get_best_favicon(self, favicons: list[dict[str, Any]], min_width: int
305313
best_favicon_url = ""
306314
best_favicon_width = 0
307315

308-
# Process favicons in chunks to limit concurrent connections
309-
chunk_size = 20
310-
all_favicon_images = []
316+
# Process favicons in smaller chunks to limit concurrent connections and memory usage
317+
chunk_size = 10
311318

312-
for chunk_urls in itertools.batched(urls, chunk_size):
319+
for chunk_idx, chunk_urls in enumerate(itertools.batched(urls, chunk_size)):
313320
chunk_images = await self.favicon_downloader.download_multiple_favicons(
314321
list(chunk_urls)
315322
)
316-
all_favicon_images.extend(chunk_images)
317323

318-
favicon_images = all_favicon_images
324+
# Calculate the offset in the favicons list for this chunk
325+
favicon_offset = chunk_idx * chunk_size
326+
327+
# Process this chunk immediately
328+
for i, (image, url) in enumerate(zip(chunk_images, chunk_urls)):
329+
if image is None or "image/" not in image.content_type:
330+
continue
331+
332+
# First priority: If favicon is an SVG and not masked, select it immediately
333+
if (
334+
image.content_type == "image/svg+xml"
335+
and (i + favicon_offset) not in masked_svg_indices
336+
):
337+
# Clear variables to help with garbage collection
338+
del chunk_images
319339

320-
# First pass: Look for SVG favicons (they are priority)
321-
for i, (favicon, image) in enumerate(zip(favicons, favicon_images)):
322-
if image is None or "image/" not in image.content_type:
323-
continue
340+
# Return immediately on finding a good SVG
341+
return url
324342

325-
# If favicon is an SVG and not masked, return it immediately
326-
if image.content_type == "image/svg+xml" and i not in masked_svg_indices:
327-
return urls[i]
343+
# Second priority: Track the highest resolution bitmap favicon
344+
try:
345+
width = self._get_favicon_smallest_dimension(image)
346+
if width > best_favicon_width:
347+
best_favicon_url = url
348+
best_favicon_width = width
349+
except Exception as e:
350+
logger.warning(f"Exception {e} for favicon at position {i+favicon_offset}")
328351

329-
# Second pass: Look for the highest resolution bitmap favicon
330-
for i, (favicon, image) in enumerate(zip(favicons, favicon_images)):
331-
if image is None or "image/" not in image.content_type:
332-
continue
352+
# Explicitly clear chunk_images to free memory immediately
353+
del chunk_images
333354

334-
try:
335-
width = self._get_favicon_smallest_dimension(image)
336-
if width > best_favicon_width:
337-
best_favicon_url = urls[i]
338-
best_favicon_width = width
339-
except Exception as e:
340-
logger.warning(f"Exception {e} for favicon {favicon}")
355+
# Add a delay between batches to prevent network resource exhaustion
356+
await asyncio.sleep(0.5)
341357

342358
logger.debug(f"Best favicon url: {best_favicon_url}, width: {best_favicon_width}")
343359
return best_favicon_url if best_favicon_width >= min_width else ""
@@ -348,10 +364,15 @@ async def _get_favicon(self, scraped_url: str, min_width: int) -> str:
348364
with the highest resolution.
349365
"""
350366
favicons: list[dict[str, Any]] = await self._extract_favicons(scraped_url)
351-
logger.info(
352-
f"{len(favicons)} favicons extracted for {scraped_url}. Favicons are: {favicons}"
353-
)
354-
return await self._get_best_favicon(favicons, min_width)
367+
logger.info(f"{len(favicons)} favicons extracted for {scraped_url}")
368+
369+
# Get the best favicon
370+
result = await self._get_best_favicon(favicons, min_width)
371+
372+
# Explicitly clear the favicons list to free memory
373+
favicons.clear()
374+
375+
return result
355376

356377
def _extract_title(self) -> Optional[str]:
357378
"""Extract title for a url"""
@@ -379,6 +400,10 @@ def _is_domain_blocked(self, domain: str, suffix: str) -> bool:
379400
second_level_domain: str = self._get_second_level_domain(domain, suffix)
380401
return second_level_domain in self.blocked_domains
381402

403+
def _is_problematic_favicon_url(self, favicon_url: str) -> bool:
404+
"""Check if a favicon URL is problematic (data URL or base64 manifest)"""
405+
return favicon_url.startswith("data:") or self.MANIFEST_JSON_BASE64_MARKER in favicon_url
406+
382407
def get_domain_metadata(
383408
self, domains_data: list[dict[str, Any]], favicon_min_width: int
384409
) -> list[dict[str, Optional[str]]]:
@@ -389,21 +414,30 @@ async def _process_domains_async(
389414
self, domains_data: list[dict[str, Any]], favicon_min_width: int
390415
) -> list[dict[str, Optional[str]]]:
391416
"""Process domains in chunks to limit resource consumption."""
392-
chunk_size = 100
417+
# Reduce batch size to decrease memory consumption and network load
418+
chunk_size = 25
393419
filtered_results: list[dict[str, Optional[str]]] = []
394420

395421
for i in range(0, len(domains_data), chunk_size):
396-
chunk = domains_data[i : i + chunk_size]
422+
end_idx = min(i + chunk_size, len(domains_data))
423+
chunk = domains_data[i:end_idx]
397424
tasks = [
398425
self._process_single_domain(domain_data, favicon_min_width)
399426
for domain_data in chunk
400427
]
401428
logger.info(
402429
f"Processing chunk of {len(chunk)} domains concurrently "
403-
f"({i + 1}-{min(i + chunk_size, len(domains_data))} of {len(domains_data)})"
430+
f"({i + 1}-{end_idx} of {len(domains_data)})"
404431
)
432+
433+
# Process current chunk with gather
405434
chunk_results = await asyncio.gather(*tasks, return_exceptions=True)
406435

436+
# Add a longer delay between chunks to allow system resources to recover
437+
if end_idx < len(domains_data):
438+
await asyncio.sleep(2.0)
439+
440+
# Process results
407441
for result in chunk_results:
408442
if isinstance(result, Exception):
409443
logger.error(f"Error processing domain: {result}")

merino/jobs/navigational_suggestions/utils.py

+16-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"Sec-Fetch-User": "?1",
2525
}
2626

27-
TIMEOUT: int = 10
27+
TIMEOUT: int = 15
2828

2929
logger = logging.getLogger(__name__)
3030

@@ -83,7 +83,20 @@ async def download_multiple_favicons(self, urls: List[str]) -> List[Optional[Ima
8383
Returns:
8484
List of favicon images
8585
"""
86-
tasks = [self.download_favicon(url) for url in urls]
86+
# Implement stricter semaphore to limit concurrent connections
87+
semaphore = asyncio.Semaphore(5)
88+
89+
async def download_with_semaphore(url: str) -> Optional[Image]:
90+
try:
91+
async with semaphore:
92+
return await self.download_favicon(url)
93+
except Exception as e:
94+
logger.warning(f"Unhandled exception in download_with_semaphore: {e}")
95+
return None
96+
97+
# Create tasks with semaphore control
98+
controlled_tasks = [download_with_semaphore(url) for url in urls]
99+
87100
# Handle the exceptions internally to maintain return type consistency
88-
results = await asyncio.gather(*tasks, return_exceptions=True)
101+
results = await asyncio.gather(*controlled_tasks, return_exceptions=True)
89102
return [None if isinstance(r, BaseException) else r for r in results]

0 commit comments

Comments
 (0)