-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpubmed_central.py
More file actions
82 lines (66 loc) · 2.31 KB
/
pubmed_central.py
File metadata and controls
82 lines (66 loc) · 2.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import tarfile
import tempfile
from ftplib import FTP
import boto3
from pathlib import Path
S3_BUCKET = "openalex-ingest"
PMC_SUBDIRS = [
"oa_bulk/oa_comm/xml",
"oa_bulk/oa_noncomm/xml",
"oa_bulk/oa_other/xml",
"manuscript/xml"
]
s3_client = boto3.client("s3")
def pubmed_ftp_client():
ftp = FTP("ftp.ncbi.nlm.nih.gov")
ftp.login()
return ftp
def list_tarballs():
ftp = pubmed_ftp_client()
tarballs = []
for subdir in PMC_SUBDIRS:
full_path = f"/pub/pmc/{subdir}"
ftp.cwd(full_path)
for fname in ftp.nlst():
if fname.endswith(".tar.gz"):
tarballs.append((f"{full_path}/{fname}", subdir, fname))
ftp.quit()
return tarballs
def download_tarball(ftp_path, local_path):
ftp = pubmed_ftp_client()
with open(local_path, "wb") as f:
ftp.retrbinary(f"RETR {ftp_path}", f.write)
ftp.quit()
def extract_and_upload(local_tarball, s3_prefix):
with tarfile.open(local_tarball, "r:gz") as tar:
for member in tar.getmembers():
if member.isfile() and member.name.endswith(('.nxml', '.xml')):
extracted = tar.extractfile(member)
if extracted is None:
continue
s3_key = f"{s3_prefix}/{member.name}"
s3_client.upload_fileobj(extracted, S3_BUCKET, s3_key)
print(f"Uploaded to s3://{S3_BUCKET}/{s3_key}")
def already_processed(s3_prefix):
response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=s3_prefix, MaxKeys=1)
return "Contents" in response
def main():
tarballs = list_tarballs()
print(f"Found {len(tarballs)} tarballs to process.")
for ftp_path, subdir, fname in tarballs:
archive_name = fname.replace('.tar.gz', '')
s3_prefix = f"pubmed-central/{subdir}/{archive_name}"
if already_processed(s3_prefix):
print(f"Skipping {ftp_path} — already processed.")
continue
print(f"Processing: {ftp_path}")
with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file:
tmp_path = tmp_file.name
try:
download_tarball(ftp_path, tmp_path)
extract_and_upload(tmp_path, s3_prefix)
finally:
os.remove(tmp_path)
if __name__ == "__main__":
main()