Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ async fn query_final_handler(
&ctx.client_session_id,
StopReason::Finished,
ErrorCode::ClosedQuery("Query closed by client"),
true,
)
.await?
{
Expand Down Expand Up @@ -351,6 +352,7 @@ async fn query_cancel_handler(
&ctx.client_session_id,
StopReason::Canceled,
ErrorCode::AbortedQuery("canceled by client"),
true,
)
.await?
{
Expand Down
35 changes: 27 additions & 8 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl HttpSessionConf {
) {
warn!(
"[HTTP-QUERY] Last query id not finished yet, id = {}, state = {:?}, is_data_drained={}",
id, state, last_query.is_data_drained.load(Ordering::Relaxed)
id, state, last_query.data_drained_at.lock().is_some()
);
}
}
Expand Down Expand Up @@ -517,7 +517,7 @@ pub enum HttpQueryState {

#[derive(Debug)]
pub enum ExpireResult {
Expired,
Expired(StopReason),
Sleep(Duration),
Stopped,
}
Expand All @@ -531,7 +531,11 @@ pub struct HttpQuery {
executor: Arc<Mutex<Executor>>,
page_manager: Arc<TokioMutex<PageManager>>,
state: Arc<Mutex<HttpQueryState>>,
is_data_drained: AtomicBool,
// User may not conn.close(), to avoid meaningless timeout check and heartbeat,
// we can mark the query as Finished after the last page has returned for 30s and stop them.
// because client retry should not take as long as result_timeout_secs.
// heartbeat_handler should notify client that no need for heartbeat anymore.
data_drained_at: Mutex<Option<Instant>>,
/// The timeout for the query result polling. In the normal case, the client driver
/// should fetch the paginated result in a timely manner, and the interval should not
/// exceed this result_timeout_secs.
Expand Down Expand Up @@ -641,7 +645,7 @@ impl HttpQuery {
query_mem_stat: ctx.get_query_memory_tracking(),
is_txn_mgr_saved: Default::default(),
last_session_conf: Default::default(),
is_data_drained: AtomicBool::new(false),
data_drained_at: Default::default(),
})
}

Expand Down Expand Up @@ -869,7 +873,10 @@ impl HttpQuery {
}

pub fn set_data_drained(&self) {
self.is_data_drained.store(true, Ordering::Relaxed);
let mut guard = self.data_drained_at.lock();
if guard.is_none() {
*guard = Some(Instant::now())
}
}

pub fn check_removed(&self) -> Option<StopReason> {
Expand All @@ -884,12 +891,17 @@ impl HttpQuery {
// return Duration to sleep
#[async_backtrace::framed]
pub async fn check_expire(&self) -> ExpireResult {
let is_drained = self.data_drained_at.lock().is_some();
let expire_state = self.state.lock();
match *expire_state {
HttpQueryState::ExpireAt(expire_at) => {
let now = Instant::now();
if now >= expire_at {
ExpireResult::Expired
ExpireResult::Expired(if is_drained {
StopReason::Finished
} else {
StopReason::Timeout
})
} else {
ExpireResult::Sleep(expire_at - now)
}
Expand All @@ -898,15 +910,22 @@ impl HttpQuery {
}
}

/// return false if no need to send heartbeat anymore
#[async_backtrace::framed]
#[fastrace::trace(name = "HttpQuery::on_heartbeat")]
pub fn on_heartbeat(&self) -> bool {
let mut expire_state = self.state.lock();
match *expire_state {
HttpQueryState::ExpireAt(_) => {
let now = Instant::now();
if let Some(drained_at) = *self.data_drained_at.lock() {
if now > drained_at + Duration::from_secs(self.result_timeout_secs.min(30)) {
*expire_state = HttpQueryState::Stopped(StopReason::Finished);
return false;
}
}
let duration = Duration::from_secs(self.result_timeout_secs);
let deadline = Instant::now() + duration;
*expire_state = HttpQueryState::ExpireAt(deadline);
*expire_state = HttpQueryState::ExpireAt(now + duration);
true
}
HttpQueryState::Stopped(_) => false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ impl HttpQueryManager {
};

match expire_res {
ExpireResult::Expired => {
ExpireResult::Expired(stop_reason) => {
// this msg is only used when query is starting or running
let msg = format!(
"[HTTP-QUERY] Query {} timed out after {} seconds",
&query_id_clone, query_result_timeout_secs
Expand All @@ -186,8 +187,9 @@ impl HttpQueryManager {
.stop_query(
&query_id_clone,
&None,
StopReason::Timeout,
stop_reason,
ErrorCode::AbortedQuery(&msg),
false,
)
.await
.ok();
Expand All @@ -213,14 +215,15 @@ impl HttpQueryManager {
client_session_id: &Option<String>,
reason: StopReason,
error: ErrorCode,
check_session_id: bool,
) -> poem::error::Result<Option<Arc<HttpQuery>>> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let (query, stop_first_run) = self.queries.write().stop(query_id, reason, now);
if let Some(q) = &query {
if reason != StopReason::Timeout {
if check_session_id {
q.check_client_session_id(client_session_id)?;
}
if stop_first_run {
Expand Down
16 changes: 7 additions & 9 deletions src/query/service/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,27 +702,25 @@ async fn test_result_timeout() -> Result<()> {
.build();
let _fixture = TestFixture::setup_with_config(&config).await?;

let json = serde_json::json!({ "sql": "SELECT 1", "pagination": {"wait_time_secs": 5}, "session": { "settings": {"http_handler_result_timeout_secs": "1"}}});
let json = serde_json::json!({
"sql": "SELECT * from numbers(5)",
"pagination": {"wait_time_secs": 5, "max_rows_per_page": 1,},
"session": { "settings": {"http_handler_result_timeout_secs": "1"}}}
);
let mut req = TestHttpQueryRequest::new(json);
assert_eq!(req.status().await.running_queries_count, 0);
let (status, result, _) = req.fetch_begin().await?;
assert_eq!(req.status().await.running_queries_count, 1);
assert_eq!(status, StatusCode::OK, "{:?}", result);
assert_eq!(result.data.len(), 1);

sleep(std::time::Duration::from_secs(10)).await;
sleep(std::time::Duration::from_secs(6)).await;
let status = req.status().await;
assert_eq!(status.running_queries_count, 0);

let query_id = result.id.clone();
// fail to get page 0 again (e.g. retry) due to timeout
let (status, _, body) = req
.do_request(Method::GET, &format!("/v1/query/{query_id}/page/0",))
.await?;
let (status, _, body) = req.fetch_next().await?;
assert_eq!(status, StatusCode::BAD_REQUEST, "{:?}", body);
assert!(body.contains("timeout"), "{}", body);
// let (status, result, _) = req.fetch_next().await?;
// assert_eq!(status, StatusCode::BAD_REQUEST, "{:?}", result);
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import requests
import time
from suites.utils import comparison_output

auth = ("root", "")
STICKY_HEADER = "X-DATABEND-STICKY-NODE"


session = {"settings": {"http_handler_result_timeout_secs": "3", "max_threads": "32"}}


Expand All @@ -27,57 +25,44 @@ def do_query(query, port=8000):
response = requests.post(url, headers=headers, json=query_payload, auth=auth)
return response.json()

def do_hb(resps):
m = {}
for resp in resps:
m.setdefault(resp.get("node_id"), []).append(resp.get("id"))
headers = {
"Content-Type": "application/json",
}
hb_uri = f"http://localhost:8000/v1/session/heartbeat"
payload = {"node_to_queries": m}
return requests.post(hb_uri, headers=headers, json=payload, auth=auth).json()

@comparison_output(
"""started query 0
started query 1
sending heartbeat 0
sending heartbeat 1
sending heartbeat 2
sending heartbeat 3
sending heartbeat 4
sending heartbeat 5
sending heartbeat 6
sending heartbeat 7
sending heartbeat 8
sending heartbeat 9
continue fetch 0
continue fetch 1
end
"""
)
def test_heartbeat():
query_resp = do_query("select count(*) from system.clusters")
num_nodes = int(query_resp.get("data")[0][0])
resp0 = do_query("select count(*) from system.clusters")
num_nodes = int(resp0.get("data")[0][0])
port = 8000 if num_nodes == 1 else 8002

resp1 = do_query("select * from numbers(100)")
print("started query 0")
# print(resp1.get("node_id"), resp1.get("id"))
resp2 = do_query("select * from numbers(100)", port=port)
print("started query 1")
# print(resp1.get("node_id"), resp1.get("id"))
# print(resp2.get("node_id"), resp2.get("id"))

url = f"http://localhost:8000/v1/session/heartbeat"
m = {}
m.setdefault(resp1.get("node_id"), []).append(resp1.get("id"))
m.setdefault(resp2.get("node_id"), []).append(resp2.get("id"))
payload = {"node_to_queries": m}
headers = {
"Content-Type": "application/json",
}
for i in range(10):
print(f"sending heartbeat {i}")
response = requests.post(url, headers=headers, json=payload, auth=auth).json()
assert len(response.get("queries_to_remove")) == 0
response = do_hb([resp1, resp2])
assert len(response.get("queries_to_remove")) == 0, f"heartbeat {i}: {response}"
time.sleep(1)

for i, r in enumerate([resp1, resp2]):
print(f"continue fetch {i}")
headers = {STICKY_HEADER: r.get("node_id")}
next_uri = f"http://localhost:8000/{r.get('next_uri')}?"
response = requests.get(next_uri, headers=headers, auth=auth)
assert response.status_code == 200, f"{response.status_code} {response.text}"
assert len(response.json().get("data")) > 0
print("end")
assert response.status_code == 200, f"query {i}:{response.status_code} {response.text}"
response = response.json()
assert len(response.get("data")) > 0, f"query {i}: {response}"

response = do_hb([resp0])
assert len(response.get("queries_to_remove")) == 1, f"resp0: {response}"

final_uri = f"http://localhost:8000/{resp0.get('final_uri')}?"
headers = {STICKY_HEADER: resp0.get("node_id")}
response = requests.get(final_uri, headers=headers, auth=auth)
assert response.status_code == 200, f"{response}"
response = response.json()
assert response["error"] is None
Loading