Skip to content

Commit

Permalink
Ugly fix for out-of-order records in redirects
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr0grog committed Feb 9, 2025
1 parent bc29ed4 commit f4a9960
Showing 1 changed file with 70 additions and 13 deletions.
83 changes: 70 additions & 13 deletions web_monitoring/cli/warc_import.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from argparse import ArgumentParser
from dataclasses import dataclass, field
from functools import lru_cache
import gzip
from itertools import islice
import json
Expand Down Expand Up @@ -132,22 +133,20 @@ def redirect_target(self) -> str:

return ''

def add(self, record: ArcWarcRecord, index: int, reader: ArchiveIterator) -> None:
def add(self, record: ArcWarcRecord, index: int, offset: int, length: int, body: bytes | None) -> None:
self.last_index = index
# The body must be read *before* getting record offset/length!
# Getting either of those first consumes the stream and throws it away.
body = record.content_stream().read()
meta = {
'id': record.rec_headers.get('WARC-Record-ID'),
'type': record.rec_type,
'offset': reader.get_record_offset(),
'length': reader.get_record_length(),
'offset': offset,
'length': length,
}
if record.rec_type == 'request':
self.request = record
self.records.insert(0, record)
self.metadata.insert(0, meta)
elif record.rec_type == 'response':
assert body is not None, f'Response record had no body ({meta["id"]})'
self.response = record
self.response_body = body
position = 1 if self.request else 0
Expand Down Expand Up @@ -187,6 +186,13 @@ def parse_warc_fields(record: ArcWarcRecord) -> StatusAndHeaders:
return parser.parse(record.content_stream(), 'WARC/1.1').headers


@lru_cache(maxsize=256)
def extract_record(warc: str, offset: int) -> tuple[ArcWarcRecord, bytes]:
with open(warc, 'rb') as file:
record = next(iter(ArchiveIterator(file)))
return record, record.content_stream().read()


def each_redirect_chain(warc: str, seeds: set[str]) -> Generator[RedirectChain, None, None]:
max_open_request_age = 250
# TODO: maybe should be using the WARC-Concurrent-To header to match up
Expand All @@ -201,6 +207,8 @@ def each_redirect_chain(warc: str, seeds: set[str]) -> Generator[RedirectChain,
seen_seeds: set[str] = set()
warc_info: dict[str, Any] = {}

response_index = {}

warc_path = Path(warc).absolute()
warc_info['warc_name'] = warc_path.name
if warc_path.parent.name == 'archive' and warc_path.parent.parent.parent.name == 'collections':
Expand All @@ -220,6 +228,12 @@ def each_redirect_chain(warc: str, seeds: set[str]) -> Generator[RedirectChain,
continue

target = record.rec_headers.get_header('WARC-Target-URI')
body = record.content_stream().read()
offset = reader.get_record_offset()

if target and record.rec_type == 'response':
response_index[target] = offset

request = open_requests.get(target)
if request is None and target not in seeds and target not in open_redirects:
continue
Expand All @@ -228,7 +242,13 @@ def each_redirect_chain(warc: str, seeds: set[str]) -> Generator[RedirectChain,
request = RequestRecords(target, warc_info=warc_info)
open_requests[target] = request

request.add(record, index, reader)
request.add(
record,
index=index,
offset=offset,
length=reader.get_record_length(),
body=body
)

chain = open_redirects.get(target)
if not chain:
Expand All @@ -243,14 +263,50 @@ def each_redirect_chain(warc: str, seeds: set[str]) -> Generator[RedirectChain,
if index >= max_open_request_age:
for chain in set(open_redirects.values()):
last = chain.requests[-1]
if last.last_index + max_open_request_age < index and not last.redirect_target:
for request in chain.requests:
del open_redirects[request.url]

yield chain
if last.last_index + max_open_request_age < index:
for _ in range(10):
redirect = chain.requests[-1].redirect_target
if redirect:
offset = response_index.get(redirect)
if offset:
target_record, body = extract_record(warc_path, offset)
request = RequestRecords(redirect)
request.add(target_record, last.last_index, offset=offset, length=0, body=body)
chain.add(request)
if request.redirect_target:
open_redirects[request.redirect_target] = chain
else:
break
else:
break

if not chain.requests[-1].redirect_target:
for request in chain.requests:
del open_redirects[request.url]

yield chain

for chain in set(open_redirects.values()):
yield chain
for _ in range(10):
redirect = chain.requests[-1].redirect_target
if redirect:
offset = response_index.get(redirect)
if offset:
target_record, body = extract_record(warc_path, offset)
request = RequestRecords(redirect)
request.add(target_record, last.last_index, offset=offset, length=0, body=body)
chain.add(request)
else:
break
else:
break

if not chain.requests[-1].response:
logger.warning(f'Chain of requests had no final response (started with {chain.requests[0].url})')
elif chain.requests[-1].redirect_target:
logger.warning(f'Chain of requests ended with a redirect (started with {chain.requests[0].url})')
else:
yield chain

# What's happening with not always getting as many chains as seeds?
missing_seeds = seeds - seen_seeds
Expand Down Expand Up @@ -371,6 +427,7 @@ def main():

db_client = db.Client.from_env()

# TODO: preload some hashes from DB
storage = S3HashStore(
args.archive_s3,
gzip=True,
Expand Down

0 comments on commit f4a9960

Please sign in to comment.