forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathclickhouse_helper.py
More file actions
217 lines (185 loc) · 6.88 KB
/
clickhouse_helper.py
File metadata and controls
217 lines (185 loc) · 6.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/bin/env python3
from typing import List
import json
import logging
import time
import requests # type: ignore
from get_robot_token import get_parameter_from_ssm
from env_helper import GITHUB_REPOSITORY
from pr_info import PRInfo
from report import TestResults
class InsertException(Exception):
pass
class ClickHouseHelper:
def __init__(self, url=None):
if url is None:
url = get_parameter_from_ssm("clickhouse-test-stat-url")
self.url = url
self.auth = {
"X-ClickHouse-User": get_parameter_from_ssm("clickhouse-test-stat-login"),
"X-ClickHouse-Key": get_parameter_from_ssm("clickhouse-test-stat-password"),
}
@staticmethod
def _insert_json_str_info_impl(url, auth, db, table, json_str):
params = {
"database": db,
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
"date_time_input_format": "best_effort",
"send_logs_level": "warning",
}
for i in range(5):
try:
response = requests.post(
url, params=params, data=json_str, headers=auth
)
except Exception as e:
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
logging.warning(error)
continue
logging.info("Response content '%s'", response.content)
if response.ok:
break
error = (
"Cannot insert data into clickhouse at try "
+ str(i)
+ ": HTTP code "
+ str(response.status_code)
+ ": '"
+ str(response.text)
+ "'"
)
if response.status_code >= 500:
# A retriable error
time.sleep(1)
continue
logging.info(
"Request headers '%s', body '%s'",
response.request.headers,
response.request.body,
)
raise InsertException(error)
else:
raise InsertException(error)
def _insert_json_str_info(self, db, table, json_str):
self._insert_json_str_info_impl(self.url, self.auth, db, table, json_str)
def insert_event_into(self, db, table, event, safe=True):
event_str = json.dumps(event)
try:
self._insert_json_str_info(db, table, event_str)
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def insert_events_into(self, db, table, events, safe=True):
jsons = []
for event in events:
jsons.append(json.dumps(event))
try:
self._insert_json_str_info(db, table, ",".join(jsons))
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def _select_and_get_json_each_row(self, db, query):
params = {
"database": db,
"query": query,
"default_format": "JSONEachRow",
}
for i in range(5):
response = None
try:
response = requests.get(self.url, params=params, headers=self.auth)
response.raise_for_status()
return response.text
except Exception as ex:
logging.warning("Cannot insert with exception %s", str(ex))
if response:
logging.warning("Reponse text %s", response.text)
time.sleep(0.1 * i)
raise Exception("Cannot fetch data from clickhouse")
def select_json_each_row(self, db, query):
text = self._select_and_get_json_each_row(db, query)
result = []
for line in text.split("\n"):
if line:
result.append(json.loads(line))
return result
def prepare_tests_results_for_clickhouse(
pr_info: PRInfo,
test_results: TestResults,
check_status: str,
check_duration: float,
check_start_time: str,
report_url: str,
check_name: str,
) -> List[dict]:
pull_request_url = f"https://github.com/{GITHUB_REPOSITORY}/commits/master"
base_ref = "master"
head_ref = "master"
base_repo = pr_info.repo_full_name
head_repo = pr_info.repo_full_name
if pr_info.number != 0:
pull_request_url = pr_info.pr_html_url
base_ref = pr_info.base_ref
base_repo = pr_info.base_name
head_ref = pr_info.head_ref
head_repo = pr_info.head_name
common_properties = dict(
pull_request_number=pr_info.number,
commit_sha=pr_info.sha,
commit_url=pr_info.commit_html_url,
check_name=check_name,
check_status=check_status,
check_duration_ms=int(float(check_duration) * 1000),
check_start_time=check_start_time,
report_url=report_url,
pull_request_url=pull_request_url,
base_ref=base_ref,
base_repo=base_repo,
head_ref=head_ref,
head_repo=head_repo,
task_url=pr_info.task_url,
)
# Always publish a total record for all checks. For checks with individual
# tests, also publish a record per test.
result = [common_properties]
for test_result in test_results:
current_row = common_properties.copy()
test_name = test_result.name
test_status = test_result.status
test_time = test_result.time or 0
current_row["test_duration_ms"] = int(test_time * 1000)
current_row["test_name"] = test_name
current_row["test_status"] = test_status
if test_result.raw_logs:
# Protect from too big blobs that contain garbage
current_row["test_context_raw"] = test_result.raw_logs[: 32 * 1024]
else:
current_row["test_context_raw"] = ""
result.append(current_row)
return result
def mark_flaky_tests(
clickhouse_helper: ClickHouseHelper, check_name: str, test_results: TestResults
) -> None:
try:
query = f"""SELECT DISTINCT test_name
FROM checks
WHERE
check_start_time BETWEEN now() - INTERVAL 3 DAY AND now()
AND check_name = '{check_name}'
AND (test_status = 'FAIL' OR test_status = 'FLAKY')
AND pull_request_number = 0
"""
tests_data = clickhouse_helper.select_json_each_row("default", query)
master_failed_tests = {row["test_name"] for row in tests_data}
logging.info("Found flaky tests: %s", ", ".join(master_failed_tests))
for test_result in test_results:
if test_result.status == "FAIL" and test_result.name in master_failed_tests:
test_result.status = "FLAKY"
except Exception as ex:
logging.error("Exception happened during flaky tests fetch %s", ex)