-
Notifications
You must be signed in to change notification settings - Fork 6.3k
/
Copy pathload_yellow_taxi_data.py
113 lines (84 loc) · 3.58 KB
/
load_yellow_taxi_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import sys
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from google.cloud import storage
from google.api_core.exceptions import NotFound, Forbidden
import time
# Change this to your bucket name
BUCKET_NAME = "dezoomcamp_hw3_2025"
# If you authenticated through the GCP SDK you can comment out these two lines
CREDENTIALS_FILE = "gcs.json"
client = storage.Client.from_service_account_json(CREDENTIALS_FILE)
# If commented initialize client with the following
# client = storage.Client(project='zoomcamp-mod3-datawarehouse')
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-"
MONTHS = [f"{i:02d}" for i in range(1, 7)]
DOWNLOAD_DIR = "."
CHUNK_SIZE = 8 * 1024 * 1024
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
bucket = client.bucket(BUCKET_NAME)
def download_file(month):
url = f"{BASE_URL}{month}.parquet"
file_path = os.path.join(DOWNLOAD_DIR, f"yellow_tripdata_2024-{month}.parquet")
try:
print(f"Downloading {url}...")
urllib.request.urlretrieve(url, file_path)
print(f"Downloaded: {file_path}")
return file_path
except Exception as e:
print(f"Failed to download {url}: {e}")
return None
def create_bucket(bucket_name):
try:
# Get bucket details
bucket = client.get_bucket(bucket_name)
# Check if the bucket belongs to the current project
project_bucket_ids = [bckt.id for bckt in client.list_buckets()]
if bucket_name in project_bucket_ids:
print(
f"Bucket '{bucket_name}' exists and belongs to your project. Proceeding..."
)
else:
print(
f"A bucket with the name '{bucket_name}' already exists, but it does not belong to your project."
)
sys.exit(1)
except NotFound:
# If the bucket doesn't exist, create it
bucket = client.create_bucket(bucket_name)
print(f"Created bucket '{bucket_name}'")
except Forbidden:
# If the request is forbidden, it means the bucket exists but you don't have access to see details
print(
f"A bucket with the name '{bucket_name}' exists, but it is not accessible. Bucket name is taken. Please try a different bucket name."
)
sys.exit(1)
def verify_gcs_upload(blob_name):
return storage.Blob(bucket=bucket, name=blob_name).exists(client)
def upload_to_gcs(file_path, max_retries=3):
blob_name = os.path.basename(file_path)
blob = bucket.blob(blob_name)
blob.chunk_size = CHUNK_SIZE
create_bucket(BUCKET_NAME)
for attempt in range(max_retries):
try:
print(f"Uploading {file_path} to {BUCKET_NAME} (Attempt {attempt + 1})...")
blob.upload_from_filename(file_path)
print(f"Uploaded: gs://{BUCKET_NAME}/{blob_name}")
if verify_gcs_upload(blob_name):
print(f"Verification successful for {blob_name}")
return
else:
print(f"Verification failed for {blob_name}, retrying...")
except Exception as e:
print(f"Failed to upload {file_path} to GCS: {e}")
time.sleep(5)
print(f"Giving up on {file_path} after {max_retries} attempts.")
if __name__ == "__main__":
create_bucket(BUCKET_NAME)
with ThreadPoolExecutor(max_workers=4) as executor:
file_paths = list(executor.map(download_file, MONTHS))
with ThreadPoolExecutor(max_workers=4) as executor:
executor.map(upload_to_gcs, filter(None, file_paths)) # Remove None values
print("All files processed and verified.")