-
Notifications
You must be signed in to change notification settings - Fork 6.3k
/
Copy path06_gcp_taxi_scheduled.yaml
249 lines (231 loc) · 18.7 KB
/
06_gcp_taxi_scheduled.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
id: 06_gcp_taxi_scheduled
namespace: zoomcamp
description: |
Best to add a label `backfill:true` from the UI to track executions created via a backfill.
CSV data used here comes from: https://github.com/DataTalksClub/nyc-tlc-data/releases
inputs:
- id: taxi
type: SELECT
displayName: Select taxi type
values: [yellow, green]
defaults: green
variables:
file: "{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy-MM')}}.csv"
gcs_file: "gs://{{kv('GCP_BUCKET_NAME')}}/{{vars.file}}"
table: "{{kv('GCP_DATASET')}}.{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy_MM')}}"
data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ (trigger.date | date('yyyy-MM')) ~ '.csv']}}"
tasks:
- id: set_label
type: io.kestra.plugin.core.execution.Labels
labels:
file: "{{render(vars.file)}}"
taxi: "{{inputs.taxi}}"
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
- id: upload_to_gcs
type: io.kestra.plugin.gcp.gcs.Upload
from: "{{render(vars.data)}}"
to: "{{render(vars.gcs_file)}}"
- id: if_yellow_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'yellow'}}"
then:
- id: bq_yellow_tripdata
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE TABLE IF NOT EXISTS `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.yellow_tripdata`
(
unique_row_id BYTES OPTIONS (description = 'A unique identifier for the trip, generated by hashing key trip attributes.'),
filename STRING OPTIONS (description = 'The source filename from which the trip data was loaded.'),
VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
tpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
tpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
passenger_count INTEGER OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. TRUE = store and forward trip, FALSE = not a store and forward trip'),
PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
)
PARTITION BY DATE(tpep_pickup_datetime);
- id: bq_yellow_table_ext
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE OR REPLACE EXTERNAL TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`
(
VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
tpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
tpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
passenger_count INTEGER OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. TRUE = store and forward trip, FALSE = not a store and forward trip'),
PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
)
OPTIONS (
format = 'CSV',
uris = ['{{render(vars.gcs_file)}}'],
skip_leading_rows = 1,
ignore_unknown_values = TRUE
);
- id: bq_yellow_table_tmp
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE OR REPLACE TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}`
AS
SELECT
MD5(CONCAT(
COALESCE(CAST(VendorID AS STRING), ""),
COALESCE(CAST(tpep_pickup_datetime AS STRING), ""),
COALESCE(CAST(tpep_dropoff_datetime AS STRING), ""),
COALESCE(CAST(PULocationID AS STRING), ""),
COALESCE(CAST(DOLocationID AS STRING), "")
)) AS unique_row_id,
"{{render(vars.file)}}" AS filename,
*
FROM `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`;
- id: bq_yellow_merge
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
MERGE INTO `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.yellow_tripdata` T
USING `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}` S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge)
VALUES (S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime, S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID, S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.improvement_surcharge, S.total_amount, S.congestion_surcharge);
- id: if_green_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'green'}}"
then:
- id: bq_green_tripdata
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE TABLE IF NOT EXISTS `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.green_tripdata`
(
unique_row_id BYTES OPTIONS (description = 'A unique identifier for the trip, generated by hashing key trip attributes.'),
filename STRING OPTIONS (description = 'The source filename from which the trip data was loaded.'),
VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
lpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
lpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip'),
RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
passenger_count INT64 OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
ehail_fee NUMERIC,
improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
trip_type STRING OPTIONS (description = 'A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch'),
congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
)
PARTITION BY DATE(lpep_pickup_datetime);
- id: bq_green_table_ext
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE OR REPLACE EXTERNAL TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`
(
VendorID STRING OPTIONS (description = 'A code indicating the LPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.'),
lpep_pickup_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was engaged'),
lpep_dropoff_datetime TIMESTAMP OPTIONS (description = 'The date and time when the meter was disengaged'),
store_and_fwd_flag STRING OPTIONS (description = 'This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip'),
RatecodeID STRING OPTIONS (description = 'The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride'),
PULocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was engaged'),
DOLocationID STRING OPTIONS (description = 'TLC Taxi Zone in which the taximeter was disengaged'),
passenger_count INT64 OPTIONS (description = 'The number of passengers in the vehicle. This is a driver-entered value.'),
trip_distance NUMERIC OPTIONS (description = 'The elapsed trip distance in miles reported by the taximeter.'),
fare_amount NUMERIC OPTIONS (description = 'The time-and-distance fare calculated by the meter'),
extra NUMERIC OPTIONS (description = 'Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges'),
mta_tax NUMERIC OPTIONS (description = '$0.50 MTA tax that is automatically triggered based on the metered rate in use'),
tip_amount NUMERIC OPTIONS (description = 'Tip amount. This field is automatically populated for credit card tips. Cash tips are not included.'),
tolls_amount NUMERIC OPTIONS (description = 'Total amount of all tolls paid in trip.'),
ehail_fee NUMERIC,
improvement_surcharge NUMERIC OPTIONS (description = '$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.'),
total_amount NUMERIC OPTIONS (description = 'The total amount charged to passengers. Does not include cash tips.'),
payment_type INTEGER OPTIONS (description = 'A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip'),
trip_type STRING OPTIONS (description = 'A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver. 1= Street-hail 2= Dispatch'),
congestion_surcharge NUMERIC OPTIONS (description = 'Congestion surcharge applied to trips in congested zones')
)
OPTIONS (
format = 'CSV',
uris = ['{{render(vars.gcs_file)}}'],
skip_leading_rows = 1,
ignore_unknown_values = TRUE
);
- id: bq_green_table_tmp
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
CREATE OR REPLACE TABLE `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}`
AS
SELECT
MD5(CONCAT(
COALESCE(CAST(VendorID AS STRING), ""),
COALESCE(CAST(lpep_pickup_datetime AS STRING), ""),
COALESCE(CAST(lpep_dropoff_datetime AS STRING), ""),
COALESCE(CAST(PULocationID AS STRING), ""),
COALESCE(CAST(DOLocationID AS STRING), "")
)) AS unique_row_id,
"{{render(vars.file)}}" AS filename,
*
FROM `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}_ext`;
- id: bq_green_merge
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
MERGE INTO `{{kv('GCP_PROJECT_ID')}}.{{kv('GCP_DATASET')}}.green_tripdata` T
USING `{{kv('GCP_PROJECT_ID')}}.{{render(vars.table)}}` S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge)
VALUES (S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime, S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count, S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee, S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge);
- id: purge_files
type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
description: To avoid cluttering your storage, we will remove the downloaded files
pluginDefaults:
- type: io.kestra.plugin.gcp
values:
serviceAccount: "{{kv('GCP_CREDS')}}"
projectId: "{{kv('GCP_PROJECT_ID')}}"
location: "{{kv('GCP_LOCATION')}}"
bucket: "{{kv('GCP_BUCKET_NAME')}}"
triggers:
- id: green_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 9 1 * *"
inputs:
taxi: green
- id: yellow_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 10 1 * *"
inputs:
taxi: yellow