-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPeriodicReport
More file actions
358 lines (294 loc) · 14.5 KB
/
PeriodicReport
File metadata and controls
358 lines (294 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import mysql.connector
import json
import re
import os # os 모듈 import
from datetime import date, datetime, time, timedelta
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.agents.models import ListSortOrder
from typing import Dict, Any, Optional, Tuple, List # List 타입 추가
# --- 설정 정보 ---
DB_CONFIG = {
"host": "127.0.0.1",
"user": "root",
"password": "1234",
"database": "AI_sleep_service",
"port": 3307
}
PROJECT_ENDPOINT = "https://happy-mgpyzagf-eastus2.services.ai.azure.com/api/projects/happy-mgpyzagf-eastus2_project"
AGENT_ID = os.environ.get("AGENT_ID", "your_agent_id_here") # 실제 환경에서 Agent ID로 설정 필요
# --- DB 스키마 필드명 정의 (periodic_reports 기준) ---
TABLE_NAME = "periodic_reports"
SCORE_FIELD = "total_score"
TIME_FIELD = "total_sleep_time"
BED_TIME_FIELD = "total_bed_time_minutes"
DEEP_TIME_FIELD = "total_deep_sleep_time_minutes"
LIGHT_TIME_FIELD = "total_light_sleep_time_minutes"
REM_TIME_FIELD = "total_rem_sleep_time_minutes"
# -----------------------------
def simple_average(old_value: float, new_value: float) -> float:
"""(기존 값 + 새 값) / 2 의 특수 누적 평균을 계산합니다."""
return (old_value + new_value) / 2.0
def cumulative_average(values: list) -> Optional[float]:
"""일반적인 산술 평균을 계산합니다. (일일 데이터 평균용)"""
valid_values = [float(v) for v in values if v is not None]
if not valid_values:
return None
return sum(valid_values) / len(valid_values)
def calculate_average_time(times: list) -> Optional[str]:
if not times:
return None
SECONDS_IN_DAY = 86400
total_seconds = []
for t in times:
if isinstance(t, datetime):
t = t.time()
seconds = t.hour * 3600 + t.minute * 60 + t.second
if seconds < 4 * 3600:
seconds += SECONDS_IN_DAY
total_seconds.append(seconds)
avg_seconds = sum(total_seconds) / len(total_seconds)
avg_seconds = avg_seconds % SECONDS_IN_DAY
hours = int(avg_seconds // 3600)
minutes = int((avg_seconds % 3600) // 60)
seconds = int(avg_seconds % 60)
return f"{hours:02}:{minutes:02}:{seconds:02}"
def get_db_connection():
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(dictionary=True)
return conn, cursor
except mysql.connector.Error as err:
print(f"❌ DB 연결 오류: {err}")
raise
def get_period_avg(conn, cursor, user_id: int, start_date: date, end_date: date) -> Dict[str, Optional[Any]]:
# daily_report에서 필요한 데이터를 가져옵니다. (bed_time_minutes 필드가 daily_report에 있다고 가정)
cursor.execute("""
SELECT
score, total_sleep_minutes, bed_time_minutes,
deep_sleep_minutes, light_sleep_minutes, rem_sleep_minutes,
sleep_session_no
FROM daily_report
WHERE user_id=%s AND sleep_date BETWEEN %s AND %s
ORDER BY sleep_date ASC
""", (user_id, start_date, end_date))
rows = cursor.fetchall()
if not rows:
raise ValueError(f"사용자 {user_id}의 {start_date}부터 {end_date}까지의 데이터가 없습니다.")
session_nos = [r.get('sleep_session_no') for r in rows if r.get('sleep_session_no') is not None]
sleep_session_count = len(set(session_nos))
cursor.execute("""
SELECT bed_at
FROM sleep_session
WHERE user_id=%s AND DATE(bed_at) BETWEEN %s AND %s
""", (user_id, start_date, end_date))
sleep_rows = cursor.fetchall()
bed_times = [r.get('bed_at') for r in sleep_rows if r.get('bed_at') is not None]
avg_bed_time = calculate_average_time(bed_times)
return {
"sleep_session_count": sleep_session_count,
SCORE_FIELD: cumulative_average([r.get('score') for r in rows]),
TIME_FIELD: cumulative_average([r.get('total_sleep_minutes') for r in rows]),
BED_TIME_FIELD: cumulative_average([r.get('bed_time_minutes') for r in rows]),
DEEP_TIME_FIELD: cumulative_average([r.get('deep_sleep_minutes') for r in rows]),
LIGHT_TIME_FIELD: cumulative_average([r.get('light_sleep_minutes') for r in rows]),
REM_TIME_FIELD: cumulative_average([r.get('rem_sleep_minutes') for r in rows]),
"avg_bed_time_str": avg_bed_time # Agent 입력용
}
def get_current_periodic_data(conn, cursor, user_id: int, start_date: date, duration_type: str) -> Optional[Dict[str, Any]]:
"""DB에서 현재 주기 보고서 데이터를 조회합니다."""
query = f"""
SELECT {SCORE_FIELD}, {TIME_FIELD}, {BED_TIME_FIELD},
{DEEP_TIME_FIELD}, {LIGHT_TIME_FIELD}, {REM_TIME_FIELD}
FROM {TABLE_NAME}
WHERE user_no = %s AND period_started_at = %s AND duration_type = %s;
"""
cursor.execute(query, (user_id, start_date, duration_type))
return cursor.fetchone()
def clean_non_standard_json(text: str) -> str:
text = text.replace("```json", "").replace("```", "").strip()
text = re.sub(r'([\{\,]\s*)(\s*\w+\s*)(\s*:\s*)', r'\1"\2"\3', text)
text = text.replace(',\n}', '\n}')
return text
def parse_agent_response(output_text: str) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
# Agent가 2개의 JSON 블록을 반환한다고 가정
json_pattern = r'(\{.*?\})'
json_blocks = re.findall(json_pattern, output_text, re.DOTALL)
if len(json_blocks) < 2:
print(f"🚨 Agent 응답에서 2개의 JSON 블록을 찾지 못했습니다. (추출된 블록 수: {len(json_blocks)})")
return None
try:
data_json_raw = clean_non_standard_json(json_blocks[0])
analysis_json_raw = clean_non_standard_json(json_blocks[1])
data_json = json.loads(data_json_raw)
analysis_json = json.loads(analysis_json_raw)
print("✅ JSON 2개 블록 파싱 성공.")
return data_json, analysis_json
except json.JSONDecodeError as e:
print(f"🚨 JSON 파싱 오류: {e}")
return None
def update_periodic_report_db(conn, cursor, user_id: int, start_date: date, end_date: date,
agent_data_json: Dict[str, Any], agent_analysis_json: Dict[str, Any], period_avg: Dict[str, Any]):
period_length = (end_date - start_date).days + 1
duration_type = 'MONTHLY' if period_length >= 28 else 'WEEKLY'
# DB에서 기존 주기 데이터 조회
current_data = get_current_periodic_data(conn, cursor, user_id, start_date, duration_type)
# 누적 평균 로직 적용 (P + D) / 2
if current_data:
# DB의 기존 값(P)과 현재 기간의 평균값(D)을 사용하여 새로운 평균을 계산합니다.
new_score = simple_average(current_data.get(SCORE_FIELD, 0.0), period_avg[SCORE_FIELD])
new_sleep_time = simple_average(current_data.get(TIME_FIELD, 0.0), period_avg[TIME_FIELD])
new_bed_time = simple_average(current_data.get(BED_TIME_FIELD, 0.0), period_avg[BED_TIME_FIELD])
new_deep_time = simple_average(current_data.get(DEEP_TIME_FIELD, 0.0), period_avg[DEEP_TIME_FIELD])
new_light_time = simple_average(current_data.get(LIGHT_TIME_FIELD, 0.0), period_avg[LIGHT_TIME_FIELD])
new_rem_time = simple_average(current_data.get(REM_TIME_FIELD, 0.0), period_avg[REM_TIME_FIELD])
else:
# INSERT의 경우, 현재 계산된 평균값을 그대로 사용
new_score = period_avg[SCORE_FIELD]
new_sleep_time = period_avg[TIME_FIELD]
new_bed_time = period_avg[BED_TIME_FIELD]
new_deep_time = period_avg[DEEP_TIME_FIELD]
new_light_time = period_avg[LIGHT_TIME_FIELD]
new_rem_time = period_avg[REM_TIME_FIELD]
upsert_sql = f"""
INSERT INTO {TABLE_NAME} (
user_no, duration_type, period_started_at,
sleep_session_count, {SCORE_FIELD}, {TIME_FIELD}, {BED_TIME_FIELD},
{DEEP_TIME_FIELD}, {LIGHT_TIME_FIELD}, {REM_TIME_FIELD},
improvement, weakness, recommendation, score_prediction_description
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
duration_type=VALUES(duration_type),
sleep_session_count=VALUES(sleep_session_count),
{SCORE_FIELD}=%s,
{TIME_FIELD}=%s,
{BED_TIME_FIELD}=%s,
{DEEP_TIME_FIELD}=%s,
{LIGHT_TIME_FIELD}=%s,
{REM_TIME_FIELD}=%s,
improvement=VALUES(improvement),
weakness=VALUES(weakness),
recommendation=VALUES(recommendation),
score_prediction_description=VALUES(score_prediction_description);
"""
# INSERT VALUES 부분
insert_params = [
user_id, duration_type, start_date,
period_avg['sleep_session_count'],
new_score, new_sleep_time, new_bed_time,
new_deep_time, new_light_time, new_rem_time,
agent_analysis_json.get('improvement', ''),
agent_analysis_json.get('weakness', ''),
agent_analysis_json.get('recommendation', ''),
agent_analysis_json.get('score_prediction_description', '')
]
# UPDATE VALUES 부분
update_params = [
new_score, new_sleep_time, new_bed_time,
new_deep_time, new_light_time, new_rem_time,
]
# 최종 매개변수 리스트
params = insert_params + update_params
cursor.execute(upsert_sql, params)
conn.commit()
print(f"✅ DB UPSERT 성공: user_no={user_id}, 기간: {start_date}, 타입: {duration_type}")
def check_updated_report(user_id: int, start_date_str: str, end_date_str: str):
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
# end_date는 여기서는 사용하지 않지만, 형식 유지를 위해 남겨둡니다.
conn = None
try:
conn, cursor = get_db_connection()
query = f"""
SELECT *
FROM {TABLE_NAME}
WHERE user_no = %s
AND period_started_at = %s;
"""
cursor.execute(query, (user_id, start_date))
result = cursor.fetchone()
print("\n================== [DB 최종 확인] ==================")
if result:
print("✅ 주기 보고서 업데이트 성공 확인:")
for key, value in result.items():
print(f" {key}: {value}")
else:
print("❌ 해당 조건의 주기 보고서 데이터를 찾을 수 없습니다. (업데이트 실패 또는 조건 불일치)")
print("=====================================================")
except Exception as e:
print(f"❌ DB 확인 중 오류 발생: {e}")
finally:
if conn and conn.is_connected():
conn.close()
def run_local_periodic_report(user_id: int, start_date_str: str, end_date_str: str):
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
conn, cursor = get_db_connection()
try:
print(f"1. 기간 데이터 조회 및 평균 계산 시작: {start_date} ~ {end_date}")
period_avg = get_period_avg(conn, cursor, user_id, start_date, end_date)
print(f" -> 평균 데이터: {period_avg}")
# --- PEM 인증 방식 적용 시작 ---
print("Initializing Azure credentials using PEM certificate...")
os.environ["AZURE_CLIENT_ID"] = "b119b103-2441-4d7a-bd05-16d97f44b0ad"
os.environ["AZURE_TENANT_ID"] = "26080271-1d99-47dd-a23f-502db6ef9f34"
CERT_PATH_RELATIVE = "./final_cert_for_azure.pem"
os.environ["AZURE_CLIENT_CERTIFICATE_PATH"] = os.path.abspath(CERT_PATH_RELATIVE)
try:
credential = DefaultAzureCredential()
except Exception as auth_error:
print(f"❌ Azure 인증 실패: {auth_error}. 환경 변수 또는 로그인 상태를 확인하세요.")
raise
# --- PEM 인증 방식 적용 끝 ---
project = AIProjectClient(credential=credential, endpoint=PROJECT_ENDPOINT)
agent = project.agents.get_agent(AGENT_ID)
print("2. Azure Agent 호출 시작 (입력 데이터만 전달)...")
thread = project.agents.threads.create()
# Agent에게 전달할 데이터
agent_input_data = json.dumps({
"user_no": user_id,
"period_started_at": str(start_date),
"period_ended_at": str(end_date),
"sleep_session_count": period_avg['sleep_session_count'],
SCORE_FIELD: period_avg[SCORE_FIELD],
TIME_FIELD: period_avg[TIME_FIELD],
BED_TIME_FIELD: period_avg[BED_TIME_FIELD],
DEEP_TIME_FIELD: period_avg[DEEP_TIME_FIELD],
LIGHT_TIME_FIELD: period_avg[LIGHT_TIME_FIELD],
REM_TIME_FIELD: period_avg[REM_TIME_FIELD],
"average_bed_time": period_avg['avg_bed_time_str']
})
project.agents.messages.create(
thread_id=thread.id,
role="user",
content=agent_input_data
)
run = project.agents.runs.create_and_process(thread_id=thread.id, agent_id=agent.id)
if run.status == "failed":
raise Exception(f"Agent 실행 실패: {run.last_error}")
messages = project.agents.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)
output_text = ""
for message in messages:
if message.text_messages and message.role == 'assistant':
output_text = message.text_messages[-1].text.value
print("3. Agent 응답 파싱 및 DB 업데이트 시작...")
# Agent가 2개의 JSON 블록을 반환한다고 가정 (데이터/분석)
parsed_data = parse_agent_response(output_text)
if parsed_data:
agent_data_json, agent_analysis_json = parsed_data
update_periodic_report_db(conn, cursor, user_id, start_date, end_date, agent_data_json, agent_analysis_json, period_avg)
else:
print("❌ Agent 응답 파싱 실패. DB 업데이트를 건너킵니다.")
except ValueError as e:
print(f"⚠️ 데이터 부족 오류: {e}")
except Exception as e:
print(f"❌ 치명적인 오류 발생: {e}")
finally:
if conn and conn.is_connected():
conn.close()
print("DB 연결 종료.")
if __name__ == "__main__":
TEST_USER_ID = 1
TEST_START_DATE = "2025-10-15"
TEST_END_DATE = "2025-10-21"
run_local_periodic_report(TEST_USER_ID, TEST_START_DATE, TEST_END_DATE)
check_updated_report(TEST_USER_ID, TEST_START_DATE, TEST_END_DATE)