Skip to content

Commit c0912cf

Browse files
committed
refactor: polish handling of HttpQuery waiting for final.
1 parent ad48962 commit c0912cf

File tree

4 files changed

+63
-61
lines changed

4 files changed

+63
-61
lines changed

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ impl HttpSessionConf {
413413
) {
414414
warn!(
415415
"[HTTP-QUERY] Last query id not finished yet, id = {}, state = {:?}, is_data_drained={}",
416-
id, state, last_query.is_data_drained.load(Ordering::Relaxed)
416+
id, state, last_query.data_drained_at.lock().is_some()
417417
);
418418
}
419419
}
@@ -517,7 +517,7 @@ pub enum HttpQueryState {
517517

518518
#[derive(Debug)]
519519
pub enum ExpireResult {
520-
Expired,
520+
Expired(StopReason),
521521
Sleep(Duration),
522522
Stopped,
523523
}
@@ -531,7 +531,11 @@ pub struct HttpQuery {
531531
executor: Arc<Mutex<Executor>>,
532532
page_manager: Arc<TokioMutex<PageManager>>,
533533
state: Arc<Mutex<HttpQueryState>>,
534-
is_data_drained: AtomicBool,
534+
// User may not conn.close(), to avoid meaningless timeout check and heartbeat,
535+
// we can mark the query as Finished after the last page has returned for 30s and stop them.
536+
// because client retry should not take as long as result_timeout_secs.
537+
// heartbeat_handler should notify client that no need for heartbeat anymore.
538+
data_drained_at: Mutex<Option<Instant>>,
535539
/// The timeout for the query result polling. In the normal case, the client driver
536540
/// should fetch the paginated result in a timely manner, and the interval should not
537541
/// exceed this result_timeout_secs.
@@ -641,7 +645,7 @@ impl HttpQuery {
641645
query_mem_stat: ctx.get_query_memory_tracking(),
642646
is_txn_mgr_saved: Default::default(),
643647
last_session_conf: Default::default(),
644-
is_data_drained: AtomicBool::new(false),
648+
data_drained_at: Default::default(),
645649
})
646650
}
647651

@@ -869,7 +873,10 @@ impl HttpQuery {
869873
}
870874

871875
pub fn set_data_drained(&self) {
872-
self.is_data_drained.store(true, Ordering::Relaxed);
876+
let mut guard = self.data_drained_at.lock();
877+
if guard.is_none() {
878+
*guard = Some(Instant::now())
879+
}
873880
}
874881

875882
pub fn check_removed(&self) -> Option<StopReason> {
@@ -884,12 +891,17 @@ impl HttpQuery {
884891
// return Duration to sleep
885892
#[async_backtrace::framed]
886893
pub async fn check_expire(&self) -> ExpireResult {
894+
let is_drained = self.data_drained_at.lock().is_some();
887895
let expire_state = self.state.lock();
888896
match *expire_state {
889897
HttpQueryState::ExpireAt(expire_at) => {
890898
let now = Instant::now();
891899
if now >= expire_at {
892-
ExpireResult::Expired
900+
ExpireResult::Expired(if is_drained {
901+
StopReason::Finished
902+
} else {
903+
StopReason::Timeout
904+
})
893905
} else {
894906
ExpireResult::Sleep(expire_at - now)
895907
}
@@ -904,9 +916,15 @@ impl HttpQuery {
904916
let mut expire_state = self.state.lock();
905917
match *expire_state {
906918
HttpQueryState::ExpireAt(_) => {
919+
let now = Instant::now();
920+
if let Some(drained_at) = *self.data_drained_at.lock() {
921+
if now > drained_at + Duration::from_secs(30) {
922+
*expire_state = HttpQueryState::Stopped(StopReason::Finished);
923+
return false;
924+
}
925+
}
907926
let duration = Duration::from_secs(self.result_timeout_secs);
908-
let deadline = Instant::now() + duration;
909-
*expire_state = HttpQueryState::ExpireAt(deadline);
927+
*expire_state = HttpQueryState::ExpireAt(now + duration);
910928
true
911929
}
912930
HttpQueryState::Stopped(_) => false,

src/query/service/src/servers/http/v1/query/http_query_manager.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ impl HttpQueryManager {
177177
};
178178

179179
match expire_res {
180-
ExpireResult::Expired => {
180+
ExpireResult::Expired(stop_reason) => {
181+
// this msg is only used when query is starting or running
181182
let msg = format!(
182183
"[HTTP-QUERY] Query {} timed out after {} seconds",
183184
&query_id_clone, query_result_timeout_secs
@@ -186,7 +187,7 @@ impl HttpQueryManager {
186187
.stop_query(
187188
&query_id_clone,
188189
&None,
189-
StopReason::Timeout,
190+
stop_reason,
190191
ErrorCode::AbortedQuery(&msg),
191192
)
192193
.await

src/query/service/tests/it/servers/http/http_query_handlers.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -702,27 +702,25 @@ async fn test_result_timeout() -> Result<()> {
702702
.build();
703703
let _fixture = TestFixture::setup_with_config(&config).await?;
704704

705-
let json = serde_json::json!({ "sql": "SELECT 1", "pagination": {"wait_time_secs": 5}, "session": { "settings": {"http_handler_result_timeout_secs": "1"}}});
705+
let json = serde_json::json!({
706+
"sql": "SELECT * from numbers(5)",
707+
"pagination": {"wait_time_secs": 5, "max_rows_per_page": 1,},
708+
"session": { "settings": {"http_handler_result_timeout_secs": "1"}}}
709+
);
706710
let mut req = TestHttpQueryRequest::new(json);
707711
assert_eq!(req.status().await.running_queries_count, 0);
708712
let (status, result, _) = req.fetch_begin().await?;
709713
assert_eq!(req.status().await.running_queries_count, 1);
710714
assert_eq!(status, StatusCode::OK, "{:?}", result);
711715
assert_eq!(result.data.len(), 1);
712716

713-
sleep(std::time::Duration::from_secs(10)).await;
717+
sleep(std::time::Duration::from_secs(6)).await;
714718
let status = req.status().await;
715719
assert_eq!(status.running_queries_count, 0);
716720

717-
let query_id = result.id.clone();
718-
// fail to get page 0 again (e.g. retry) due to timeout
719-
let (status, _, body) = req
720-
.do_request(Method::GET, &format!("/v1/query/{query_id}/page/0",))
721-
.await?;
721+
let (status, _, body) = req.fetch_next().await?;
722722
assert_eq!(status, StatusCode::BAD_REQUEST, "{:?}", body);
723723
assert!(body.contains("timeout"), "{}", body);
724-
// let (status, result, _) = req.fetch_next().await?;
725-
// assert_eq!(status, StatusCode::BAD_REQUEST, "{:?}", result);
726724
Ok(())
727725
}
728726

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import requests
22
import time
3-
from suites.utils import comparison_output
43

54
auth = ("root", "")
65
STICKY_HEADER = "X-DATABEND-STICKY-NODE"
76

8-
97
session = {"settings": {"http_handler_result_timeout_secs": "3", "max_threads": "32"}}
108

119

@@ -27,57 +25,44 @@ def do_query(query, port=8000):
2725
response = requests.post(url, headers=headers, json=query_payload, auth=auth)
2826
return response.json()
2927

28+
def do_hb(resps):
29+
m = {}
30+
for resp in resps:
31+
m.setdefault(resp.get("node_id"), []).append(resp.get("id"))
32+
headers = {
33+
"Content-Type": "application/json",
34+
}
35+
hb_uri = f"http://localhost:8000/v1/session/heartbeat"
36+
payload = {"node_to_queries": m}
37+
return requests.post(hb_uri, headers=headers, json=payload, auth=auth).json()
3038

31-
@comparison_output(
32-
"""started query 0
33-
started query 1
34-
sending heartbeat 0
35-
sending heartbeat 1
36-
sending heartbeat 2
37-
sending heartbeat 3
38-
sending heartbeat 4
39-
sending heartbeat 5
40-
sending heartbeat 6
41-
sending heartbeat 7
42-
sending heartbeat 8
43-
sending heartbeat 9
44-
continue fetch 0
45-
continue fetch 1
46-
end
47-
"""
48-
)
4939
def test_heartbeat():
50-
query_resp = do_query("select count(*) from system.clusters")
51-
num_nodes = int(query_resp.get("data")[0][0])
40+
resp0 = do_query("select count(*) from system.clusters")
41+
num_nodes = int(resp0.get("data")[0][0])
5242
port = 8000 if num_nodes == 1 else 8002
5343

5444
resp1 = do_query("select * from numbers(100)")
55-
print("started query 0")
56-
# print(resp1.get("node_id"), resp1.get("id"))
5745
resp2 = do_query("select * from numbers(100)", port=port)
58-
print("started query 1")
59-
# print(resp1.get("node_id"), resp1.get("id"))
60-
# print(resp2.get("node_id"), resp2.get("id"))
6146

62-
url = f"http://localhost:8000/v1/session/heartbeat"
63-
m = {}
64-
m.setdefault(resp1.get("node_id"), []).append(resp1.get("id"))
65-
m.setdefault(resp2.get("node_id"), []).append(resp2.get("id"))
66-
payload = {"node_to_queries": m}
67-
headers = {
68-
"Content-Type": "application/json",
69-
}
7047
for i in range(10):
71-
print(f"sending heartbeat {i}")
72-
response = requests.post(url, headers=headers, json=payload, auth=auth).json()
73-
assert len(response.get("queries_to_remove")) == 0
48+
response = do_hb([resp1, resp2])
49+
assert len(response.get("queries_to_remove")) == 0, f"heartbeat {i}: {response}"
7450
time.sleep(1)
7551

7652
for i, r in enumerate([resp1, resp2]):
77-
print(f"continue fetch {i}")
7853
headers = {STICKY_HEADER: r.get("node_id")}
7954
next_uri = f"http://localhost:8000/{r.get('next_uri')}?"
8055
response = requests.get(next_uri, headers=headers, auth=auth)
81-
assert response.status_code == 200, f"{response.status_code} {response.text}"
82-
assert len(response.json().get("data")) > 0
83-
print("end")
56+
assert response.status_code == 200, f"query {i}:{response.status_code} {response.text}"
57+
response = response.json()
58+
assert len(response.get("data")) > 0, f"query {i}: {response}"
59+
60+
response = do_hb([resp0])
61+
assert len(response.get("queries_to_remove")) == 1, f"resp0: {response}"
62+
63+
final_uri = f"http://localhost:8000/{resp0.get('final_uri')}?"
64+
headers = {STICKY_HEADER: resp0.get("node_id")}
65+
response = requests.get(final_uri, headers=headers, auth=auth)
66+
assert response.status_code == 200, f"{response}"
67+
response = response.json()
68+
assert response["error"] is None

0 commit comments

Comments
 (0)