-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathcovid_data.py
343 lines (303 loc) · 16.5 KB
/
covid_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import datetime
import glob
import json
import os
import shutil
import time
from multiprocessing import Pool
import pandas as pd
import tqdm
import covid_data_api
import covid_data_bed
import covid_data_briefing
import covid_data_dash
import covid_data_situation
import covid_data_testing
import covid_data_tweets
import covid_data_vac
from utils_pandas import add_data
from utils_pandas import cum2daily
from utils_pandas import export
from utils_pandas import import_csv
from utils_pandas import weekly2daily
from utils_scraping import CHECK_NEWER
from utils_scraping import logger
from utils_scraping import USE_CACHE_DATA
from utils_scraping import web_files
from utils_thai import DISTRICT_RANGE
from utils_thai import get_fuzzy_provinces
from utils_thai import join_provinces
from utils_thai import today
def prov_to_districts(dfprov):
# Reduce down to health areas
dfprov_grouped = dfprov.groupby(["Date", "Health District Number"]).sum(min_count=1).reset_index()
dfprov_grouped = dfprov_grouped.pivot(index="Date", columns=['Health District Number'])
dfprov_grouped = dfprov_grouped.rename(columns=dict((i, f"Area {i}") for i in DISTRICT_RANGE))
# Can cause problems sum across all provinces. might be missing data.
# by_type = dfprov_grouped.groupby(level=0, axis=1).sum(min_count=1)
# Collapse columns to "Cases Proactive Area 13" etc
dfprov_grouped.columns = dfprov_grouped.columns.map(' '.join).str.strip()
by_area = dfprov_grouped # .combine_first(by_type)
# Ensure we have all areas
for i in DISTRICT_RANGE:
col = f"Cases Walkin Area {i}"
if col not in by_area:
by_area[col] = by_area.get(col, pd.Series(index=by_area.index, name=col))
col = f"Cases Proactive Area {i}"
if col not in by_area:
by_area[col] = by_area.get(col, pd.Series(index=by_area.index, name=col))
return by_area
################################
# Misc
################################
def get_hospital_resources():
# PUI + confirmed, recovered etc stats
fields = [
'OBJECTID', 'ID', 'agency_code', 'label', 'agency_status', 'status',
'address', 'province', 'amphoe', 'tambol', 'latitude', 'longitude',
'level_performance', 'ministryname', 'depart', 'ShareRoom_Total',
'ShareRoom_Available', 'ShareRoom_Used', 'Private_AIIR_Total',
'Private_AIIR_Available', 'Private_AIIR_Used',
'Private_Modified_AIIR_Total', 'Private_Modified_AIIR_Available',
'Private_Modified_AIIR_Used', 'Private_Isolation_room_Total',
'Private_Isolation_room_Availabl', 'Private_Isolation_room_Used',
'Private_Cohort_ward_Total', 'Private_Cohort_ward_Available',
'Private_Cohort_ward_Used', 'Private_High_Flow_Total',
'Private_High_Flow_Available', 'Private_High_Flow_Used',
'Private_OR_negative_pressure_To', 'Private_OR_negative_pressure_Av',
'Private_OR_negative_pressure_Us', 'Private_ICU_Total',
'Private_ICU_Available', 'Private_ICU_Used',
'Private_ARI_clinic_Total', 'Private_ARI_clinic_Available',
'Private_ARI_clinic_Used', 'Volume_control_Total',
'Volume_control_Available', 'Volume_control_Used',
'Pressure_control_Total', 'Pressure_control_Available',
'Pressure_control_Used', 'Volumecontrol_Child_Total',
'Volumecontrol_Child_Available', 'Volumecontrol_Child_Used',
'Ambulance_Total', 'Ambulance_Availble', 'Ambulance_Used',
'Pills_Favipiravir_Total', 'Pills_Favipiravir_Available',
'Pills_Favipiravir_Used', 'Pills_Oseltamivir_Total',
'Pills_Oseltamivir_Available', 'Pills_Oseltamivir_Used',
'Pills_ChloroquinePhosphate_Tota', 'Pills_ChloroquinePhosphate_Avai',
'Pills_ChloroquinePhosphate_Used', 'Pills_LopinavirRitonavir_Total',
'Pills_LopinavirRitonavir_Availa', 'Pills_LopinavirRitonavir_Used',
'Pills_Darunavir_Total', 'Pills_Darunavir_Available',
'Pills_Darunavir_Used', 'Lab_PCRTest_Total', 'Lab_PCRTest_Available',
'Lab_PCRTest_Used', 'Lab_RapidTest_Total', 'Lab_RapidTest_Available',
'Lab_RapidTest_Used', 'Face_shield_Total', 'Face_shield_Available',
'Face_shield_Used', 'Cover_all_Total', 'Cover_all_Available',
'Cover_all_Used', 'ถุงมือไนไตรล์ชนิดใช้', 'ถุงมือไนไตรล์ชนิดใช้_1',
'ถุงมือไนไตรล์ชนิดใช้_2', 'ถุงมือไนไตรล์ชนิดใช้_3',
'ถุงมือไนไตรล์ชนิดใช้_4', 'ถุงมือไนไตรล์ชนิดใช้_5',
'ถุงมือยางชนิดใช้แล้ว', 'ถุงมือยางชนิดใช้แล้ว_1',
'ถุงมือยางชนิดใช้แล้ว_2', 'ถุงสวมขา_Leg_cover_Total',
'ถุงสวมขา_Leg_cover_Available', 'ถุงสวมขา_Leg_cover_Used',
'พลาสติกหุ้มคอ_HOOD_Total', 'พลาสติกหุ้มคอ_HOOD_Available',
'พลาสติกหุ้มคอ_HOOD_Used', 'พลาสติกหุ้มรองเท้า_Total',
'พลาสติกหุ้มรองเท้า_Availab', 'พลาสติกหุ้มรองเท้า_Used',
'แว่นครอบตาแบบใส_Goggles_Total', 'แว่นครอบตาแบบใส_Goggles_Availab',
'แว่นครอบตาแบบใส_Goggles_Used', 'เสื้อกาวน์ชนิดกันน้ำ_T',
'เสื้อกาวน์ชนิดกันน้ำ_A', 'เสื้อกาวน์ชนิดกันน้ำ_U',
'หมวกคลุมผมชนิดใช้แล้', 'หมวกคลุมผมชนิดใช้แล้_1',
'หมวกคลุมผมชนิดใช้แล้_2', 'เอี๊ยมพลาสติกใส_Apron_Total',
'เอี๊ยมพลาสติกใส_Apron_Available', 'เอี๊ยมพลาสติกใส_Apron_Used',
'UTM_Total', 'UTM_Available', 'UTM_Used', 'VTM_Total', 'VTM_Available',
'VTM_Used', 'Throat_Swab_Total', 'Throat_Swab_Available',
'Throat_Swab_Used', 'NS_Swab_Total', 'NS_Swab_Available',
'NS_Swab_Used', 'Surgicalmask_Total', 'Surgicalmask_Available',
'Surgicalmask_Used', 'N95_Total', 'N95_Available', 'N95_Used',
'Dr_ChestMedicine_Total', 'Dr_ChestMedicine_Available',
'Dr_ChestMedicine_Used', 'Dr_ID_Medicine_Total',
'Dr_ID_Medicine_Availble', 'Dr_ID_Medicine_Used', 'Dr_Medical_Total',
'Dr_Medical_Available', 'Dr_Medical_Used', 'Nurse_ICN_Total',
'Nurse_ICN_Available', 'Nurse_ICN_Used', 'Nurse_RN_Total',
'Nurse_RN_Available', 'Nurse_RN_Used', 'Pharmacist_Total',
'Pharmacist_Available', 'Pharmacist_Used', 'MedTechnologist_Total',
'MedTechnologist_Available', 'MedTechnologist_Used', 'Screen_POE',
'Screen_Walk_in', 'PUI', 'Confirm_mild', 'Confirm_moderate',
'Confirm_severe', 'Confirm_Recovered', 'Confirm_Death', 'GlobalID',
'region_health', 'CoverAll_capacity', 'ICU_Covid_capacity',
'N95_capacity', 'AIIR_room_capacity', 'CoverAll_status',
'Asymptomatic', 'ICUforCovidTotal', 'ICUforCovidAvailable',
'ICUforCovidUsed'
]
# pui = "https://services8.arcgis.com/241MQ9HtPclWYOzM/arcgis/rest/services/Corona_Date/FeatureServer/0/query?f=json&where=1%3D1&returnGeometry=false&spatialRel=esriSpatialRelIntersects&outFields=*&orderByFields=Date%20asc&resultOffset=0&resultRecordCount=32000&resultType=standard&cacheHint=true" # noqa: E501
# icu = "https://services8.arcgis.com/241MQ9HtPclWYOzM/arcgis/rest/services/Hospital_Data_Dashboard/FeatureServer/0/query?f=json&where=1%3D1&returnGeometry=false&spatialRel=esriSpatialRelIntersects&outFields=*&outStatistics=%5B%7B%22statisticType%22%3A%22sum%22%2C%22onStatisticField%22%3A%22Private_ICU_Total%22%2C%22outStatisticFieldName%22%3A%22value%22%7D%5D&resultType=standard&cacheHint=true" # noqa: E501
rows = []
for page in range(0, 2000, 1000):
every_district = f"https://services8.arcgis.com/241MQ9HtPclWYOzM/arcgis/rest/services/Hospital_Data_Dashboard/FeatureServer/0/query?f=json&where=1%3D1&returnGeometry=false&spatialRel=esriSpatialRelIntersects&outFields=*&resultOffset={page}&resultRecordCount=1000&cacheHint=true" # noqa: E501
file, content, _ = next(web_files(every_district, dir="inputs/json", check=True))
jcontent = json.loads(content)
rows.extend([x['attributes'] for x in jcontent['features']])
data = pd.DataFrame(rows).groupby("province").sum()
data['Date'] = today().date()
data['Date'] = pd.to_datetime(data['Date'])
data = data.reset_index().set_index(["Date", "province"])
old = import_csv("hospital_resources")
if old is not None:
old = old.set_index(["Date", "province"])
# TODO: seems to be dropping old data. Need to test
data = add_data(old, data)
export(data, "hospital_resources", csv_only=True)
return data
# TODO: Additional data sources
# - new moph apis
# - https://covid19.ddc.moph.go.th/
# - medical supplies (tableau)
# - https://public.tableau.com/app/profile/karon5500/viz/moph_covid_v3/Story1
# - is it accurate?
# - no timeseries
# - vaccine imports (unofficial) (getting out of date?)
# - https://docs.google.com/spreadsheets/u/1/d/1BaCh5Tbm1EXwh4SeRM9dv-yemK2J5RpO-dz28UVtX3s/htmlview?fbclid=IwAR36L3itMKFv6fq7q-7_CF4WpxtI-QGQAcJ1f62BLen6N6IHc1iq-u-wWNI/export?gid=0&format=csv # noqa
# - vaccine dashboard (power BI)
# - https://dashboard-vaccine.moph.go.th/dashboard.html
# - groups, ages, manuf per prov. ages per group all thailand
# - no timeseries
# - Vaccine total numbers in at risk groups
# - https://hdcservice.moph.go.th/hdc/main/index.php
# - vaccine slides
# - has complications list but in graphic
# - briefings
# - clusters per day
# - nationality of deaths
# - time to death?
# - deaths at home
# - test reports
# - top labs over time
# Public transport usage to determine mobility?
# - https://datagov.mot.go.th/dataset/covid-19/resource/71a552d0-0fea-4e05-b78c-42d58aa88db6
# - doesn't have pre 2020 dailies though
# health district 8 data - https://r8way.moph.go.th/r8way/covid-19
def do_work(job):
global job_data
start = time.time()
logger.info(f"==== {job.__name__} Start ====")
data = job()
logger.info(f"==== {job.__name__} in {datetime.timedelta(seconds=time.time() - start)} ====")
return (job.__name__, data)
def scrape_and_combine():
os.makedirs("api", exist_ok=True)
quick = USE_CACHE_DATA and os.path.exists(os.path.join('api', 'combined.csv'))
MAX_DAYS = int(os.environ.get("MAX_DAYS", 1 if USE_CACHE_DATA else 0))
logger.info('\n\nUSE_CACHE_DATA = {}\nCHECK_NEWER = {}\nMAX_DAYS = {}\n\n', quick, CHECK_NEWER, MAX_DAYS)
# TODO: replace with cli --data=situation,briefings --start=2021-06-01 --end=2021-07-01
# "--data=" to plot only
if USE_CACHE_DATA and MAX_DAYS == 0:
old = import_csv("combined")
old = old.set_index("Date")
return old
jobs = [
# covid_data_vac.vac_slides,
# covid_data_vac.vaccination_reports,
# covid_data_briefing.get_cases_by_prov_briefings,
covid_data_dash.dash_weekly,
covid_data_dash.dash_province_weekly,
covid_data_dash.dash_by_province,
covid_data_api.get_cases_by_demographics_api,
covid_data_dash.dash_ages,
# covid_data_situation.get_thai_situation,
# covid_data_situation.get_en_situation,
covid_data_testing.get_test_reports,
covid_data_dash.dash_daily,
covid_data_api.excess_deaths,
covid_data_testing.get_tests_by_day,
covid_data_testing.get_tests_per_province,
# covid_data_tweets.get_cases_by_prov_tweets,
covid_data_api.get_cases_timelineapi,
covid_data_api.get_cases_timelineapi_weekly,
covid_data_testing.get_variant_reports,
covid_data_api.ihme_dataset,
covid_data_api.timeline_by_province,
covid_data_api.timeline_by_province_weekly,
covid_data_api.deaths_by_province_weekly,
# This doesn't add any more info since severe cases was a mistake
# covid_data_dash.dash_trends_prov
# covid_data_bed.get_df
# covid_data_situation.get_situation_today
]
with Pool(1 if MAX_DAYS > 0 else None) as pool:
res = dict(pool.imap_unordered(do_work, jobs))
pool.close()
pool.join()
logger.info(f"data={len(res)}")
# vac_reports, vac_reports_prov = res['vaccination_reports']
# briefings_prov, cases_briefings = res['get_cases_by_prov_briefings']
cases_demo, risks_prov, case_api_by_area = res['get_cases_by_demographics_api']
# tweets_prov, twcases = res['get_cases_by_prov_tweets']
deaths_weekly, deaths_prov_weekly = res['deaths_by_province_weekly']
# Combine dashboard data
# dash_by_province = dash_trends_prov.combine_first(dash_by_province)
#export(res['dash_by_province'], "moph_dashboard_prov", csv_only=True, dir="inputs/json")
# "json" for caching, api so it's downloadable
for file in glob.glob('inputs/json/moph*.csv'):
shutil.copy(file, "api")
# Export briefings
briefings = import_csv("cases_briefings", ["Date"], False)
# briefings = briefings.combine_first(cases_briefings).combine_first(twcases)
# export(briefings, "cases_briefings")
vaccols = [f"Vac Given {d} Cum" for d in range(1, 5)]
hospcols = [c for c in res['dash_province_weekly'].columns if 'Hospitalized' in c]
# Export per province
briefings_prov = import_csv("cases_briefings_prov", ["Date", "Province"], False)
# export(briefings_prov, "cases_briefings_prov", csv_only=True)
# TODO; put tweets_prov into cases_briefings_prov
dfprov = import_csv("cases_by_province", ["Date", "Province"], not USE_CACHE_DATA)
dfprov = dfprov.combine_first(
briefings_prov).combine_first(
res['timeline_by_province']).combine_first(
res['timeline_by_province_weekly']).combine_first(
deaths_prov_weekly).combine_first(
res['dash_by_province']).combine_first(
cum2daily(res['dash_province_weekly'], drop=False, exclude=vaccols + hospcols)).combine_first(
# tweets_prov).combine_first(
risks_prov) # TODO: check they agree
dfprov = join_provinces(dfprov, on="Province")
if "Hospitalized Severe" in dfprov.columns:
# Made a mistake. This is really Cases Proactive
dfprov["Cases Proactve"] = dfprov["Hospitalized Severe"]
dfprov = dfprov.drop(columns=["Hospitalized Severe"])
export(dfprov, "cases_by_province")
# Export per district (except tests which are dodgy?)
by_area = prov_to_districts(dfprov[[c for c in dfprov.columns if "Tests" not in c]])
cases_by_area = import_csv("cases_by_area", ["Date"], not USE_CACHE_DATA)
cases_by_area = cases_by_area.combine_first(by_area).combine_first(case_api_by_area)
export(cases_by_area, "cases_by_area")
# Export IHME dataset
export(res['ihme_dataset'], "ihme")
# Export situation
situation = covid_data_situation.export_situation(res.get('get_thai_situation', None), res.get('get_en_situation', None))
# vac = covid_data_vac.export_vaccinations(vac_reports, vac_reports_prov, res['vac_slides'])
vac = import_csv("vac_timeline", ['Date'])
dash_weekly = cum2daily(res['dash_weekly'], drop=False, exclude=vaccols + hospcols)
dash_weekly = dash_weekly.combine_first(weekly2daily(
dash_weekly[(c for c in dash_weekly.columns if "Deaths " in c)]))
logger.info("========Combine all data sources==========")
df = pd.DataFrame(columns=["Date"]).set_index("Date")
for f in [
res['get_test_reports'],
res['get_tests_by_day'],
briefings,
res['get_cases_timelineapi'],
res['get_cases_timelineapi_weekly'],
deaths_weekly,
# twcases,
cases_demo,
cases_by_area,
situation,
res['dash_ages'],
res['dash_daily'],
dash_weekly,
vac,
]:
df = df.combine_first(f)
logger.info(df)
if quick:
old = import_csv("combined", index=["Date"])
df = df.combine_first(old)
return df
else:
export(df, "combined", csv_only=True)
export(get_fuzzy_provinces(), "fuzzy_provinces", csv_only=True)
return df
if __name__ == "__main__":
# does exports
scrape_and_combine()