-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathenterprise_functions.py
281 lines (250 loc) · 10.2 KB
/
enterprise_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import os
import json
import requests
from datetime import datetime as pydatetime, timedelta, timezone
from typing import Optional, Callable, Any, Set
from dotenv import load_dotenv
load_dotenv()
def fetch_datetime(
format_str: str = "%Y-%m-%d %H:%M:%S",
unix_ts: int | None = None,
tz_offset_seconds: int | None = None
) -> str:
"""
Returns either the current UTC date/time in the given format, or if unix_ts
is given, converts that timestamp to either UTC or local time (tz_offset_seconds).
:param format_str: The strftime format, e.g. "%Y-%m-%d %H:%M:%S".
:param unix_ts: Optional Unix timestamp. If provided, returns that specific time.
:param tz_offset_seconds: If provided, shift the datetime by this many seconds from UTC.
:return: A JSON string containing the "datetime" or an "error" key/value.
"""
try:
if unix_ts is not None:
dt_utc = pydatetime.fromtimestamp(unix_ts, tz=timezone.utc)
else:
dt_utc = pydatetime.now(timezone.utc)
if tz_offset_seconds is not None:
local_tz = timezone(timedelta(seconds=tz_offset_seconds))
dt_local = dt_utc.astimezone(local_tz)
result_str = dt_local.strftime(format_str)
else:
result_str = dt_utc.strftime(format_str)
return json.dumps({"datetime": result_str})
except Exception as e:
return json.dumps({"error": f"Exception: {str(e)}"})
def fetch_weather(
location: str,
country_code: str = "",
state_code: str = "",
limit: int = 1,
timeframe: str = "current",
time_offset: int = 0,
dt_unix: Optional[int] = None
) -> str:
"""
Fetches weather data from OpenWeather for the specified location and timeframe.
:param location: The city or place name to look up.
:param country_code: (optional) e.g. 'US' or 'GB' to narrow down your search.
:param state_code: (optional) The state or province code, e.g. 'CA' for California.
:param limit: (optional) The max number of geocoding results (defaults to 1).
:param timeframe: The type of weather data, e.g. 'current','hourly','daily','timemachine', or 'overview'.
:param time_offset: For 'hourly' or 'daily', used as the index into the array. For 'overview', the day offset.
:param dt_unix: A Unix timestamp, required if timeframe='timemachine'.
:return: A JSON string containing weather data or an "error" key if an issue.
"""
try:
if not location:
return json.dumps({"error": "Missing required parameter: location"})
geo_api_key = os.environ.get("OPENWEATHER_GEO_API_KEY")
one_api_key = os.environ.get("OPENWEATHER_ONE_API_KEY")
if not geo_api_key or not one_api_key:
return json.dumps({"error": "Missing OpenWeather API keys in environment."})
# Convert location -> lat/lon:
if country_code and state_code:
query = f"{location},{state_code},{country_code}"
elif country_code:
query = f"{location},{country_code}"
else:
query = location
geocode_url = (
f"http://api.openweathermap.org/geo/1.0/direct?"
f"q={query}&limit={limit}&appid={geo_api_key}"
)
geo_resp = requests.get(geocode_url)
if geo_resp.status_code != 200:
return json.dumps({
"error": "Geocoding request failed",
"status_code": geo_resp.status_code,
"details": geo_resp.text
})
geocode_data = geo_resp.json()
if not geocode_data:
return json.dumps({"error": f"No geocoding results for '{location}'."})
lat = geocode_data[0].get("lat")
lon = geocode_data[0].get("lon")
if lat is None or lon is None:
return json.dumps({"error": "No valid lat/long returned."})
tf = timeframe.lower()
if tf == "timemachine":
if dt_unix is None:
return json.dumps({
"error": "For timeframe='timemachine', you must provide 'dt_unix'."
})
url = (
f"https://api.openweathermap.org/data/3.0/onecall/timemachine"
f"?lat={lat}&lon={lon}"
f"&dt={dt_unix}"
f"&units=metric"
f"&appid={one_api_key}"
)
elif tf == "overview":
date_obj = pydatetime.utcnow() + timedelta(days=time_offset)
date_str = date_obj.strftime("%Y-%m-%d")
url = (
f"https://api.openweathermap.org/data/3.0/onecall/overview?"
f"lat={lat}&lon={lon}"
f"&date={date_str}"
f"&units=metric"
f"&appid={one_api_key}"
)
else:
if tf == "current":
exclude = "minutely,hourly,daily,alerts"
elif tf == "hourly":
exclude = "minutely,daily,alerts"
elif tf == "daily":
exclude = "minutely,hourly,alerts"
else:
exclude = ""
url = (
f"https://api.openweathermap.org/data/3.0/onecall?"
f"lat={lat}&lon={lon}"
f"&exclude={exclude}"
f"&units=metric"
f"&appid={one_api_key}"
)
resp = requests.get(url)
if resp.status_code != 200:
return json.dumps({
"error": "Weather API failed",
"status_code": resp.status_code,
"details": resp.text
})
data = resp.json()
if tf == "overview":
overview = data.get("weather_overview", "No overview text provided.")
return json.dumps({
"location": location,
"latitude": lat,
"longitude": lon,
"weather_overview": overview,
"description": "N/A",
"temperature_c": "N/A",
"temperature_f": "N/A",
"humidity_percent": "N/A",
})
if tf == "timemachine":
arr = data.get("data", [])
if not arr:
return json.dumps({"error": "No 'data' array for timemachine"})
sel = arr[0]
elif tf == "hourly":
arr = data.get("hourly", [])
if time_offset < 0 or time_offset >= len(arr):
return json.dumps({
"error": f"Requested hour index {time_offset}, but length is {len(arr)}"
})
sel = arr[time_offset]
elif tf == "daily":
arr = data.get("daily", [])
if time_offset < 0 or time_offset >= len(arr):
return json.dumps({
"error": f"Requested day index {time_offset}, but length is {len(arr)}"
})
sel = arr[time_offset]
else:
sel = data.get("current", {})
if not isinstance(sel, dict):
return json.dumps({"error": f"Unexpected data format for timeframe={timeframe}"})
description = "N/A"
if sel.get("weather"):
description = sel["weather"][0].get("description", "N/A")
temp_c = sel.get("temp")
humidity = sel.get("humidity", "N/A")
if isinstance(temp_c, (int, float)):
temp_f = round(temp_c * 9 / 5 + 32, 2)
else:
temp_f = "N/A"
result = {
"location": location,
"latitude": lat,
"longitude": lon,
"description": description,
"temperature_c": temp_c if temp_c is not None else "N/A",
"temperature_f": temp_f,
"humidity_percent": humidity,
}
return json.dumps(result)
except Exception as e:
return json.dumps({"error": f"Exception occurred: {str(e)}"})
def fetch_stock_price(
ticker_symbol: str,
period: str = "1d",
interval: str = "1d",
start: Optional[str] = None,
end: Optional[str] = None
) -> str:
"""
Fetch stock price info for a given ticker symbol, with optional historical data.
:param ticker_symbol: The ticker symbol to look up, e.g. "MSFT".
:param period: Over what period to pull data, e.g. "1d", "1mo", "1y".
:param interval: The granularity of data, e.g. "1d", "1h".
:param start: (optional) The start date/time in YYYY-MM-DD or YYYY-MM-DD HH:MM:SS format.
:param end: (optional) The end date/time in similar format.
:return: A JSON string containing stock data or an "error" message.
"""
import yfinance as yf
try:
stock = yf.Ticker(ticker_symbol)
stock_data = stock.history(period=period, interval=interval, start=start, end=end)
if stock_data.empty:
return json.dumps({"error": f"No data found for symbol: {ticker_symbol}"})
stock_data.reset_index(inplace=True)
stock_data['Date'] = stock_data['Date'].dt.strftime('%Y-%m-%d %H:%M:%S')
data_records = stock_data.to_dict(orient="records")
return json.dumps({
"ticker_symbol": ticker_symbol.upper(),
"data": data_records
})
except (KeyError, ValueError) as e:
return json.dumps({"error": f"Invalid or missing data: {e}"})
except Exception as e:
return json.dumps({"error": f"Unexpected issue - {type(e).__name__}: {e}"})
def send_email(recipient: str, subject: str, body: str) -> str:
"""
Sends an email (mock) with the specified subject and body to the recipient.
:param recipient: The email address or ID to send the message to.
:param subject: The email subject line.
:param body: The main text or HTML body of the email.
:return: A JSON string with a "message" or "error".
"""
try:
logs = [
f"Sending email to {recipient}...",
f"Subject: {subject}",
f"Body:\n{body}"
]
logs.append(f"Email successfully sent to {recipient}.")
return json.dumps({
"logs": logs,
"message": f"Email sent to {recipient}."
})
except Exception as e:
return json.dumps({"error": f"Failed to send email: {e}"})
# make functions callable a callable set from enterprise-streaming-agent.ipynb
enterprise_fns: Set[Callable[..., Any]] = {
fetch_datetime,
fetch_weather,
fetch_stock_price,
send_email
}