Skip to content

Commit fdd7b08

Browse files
authored
refactor: simplify session for worksheet. (#18530)
* refactor: simply session for worksheet. * refactor: HttpSessionConf tolerant empty string field in JSON.
1 parent 8686ba1 commit fdd7b08

File tree

4 files changed

+112
-37
lines changed

4 files changed

+112
-37
lines changed

src/query/service/src/servers/http/middleware/session.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,11 @@ impl<E> HTTPSessionEndpoint<E> {
414414
.await?;
415415
login_history.user_name = user_name.clone();
416416

417-
let mut client_session = ClientSession::try_decode(req, &mut client_caps)?;
417+
let mut client_session = if is_worksheet {
418+
ClientSession::try_decode_for_worksheet(req)
419+
} else {
420+
ClientSession::try_decode(req, &mut client_caps)?
421+
};
418422
if client_session.is_none() && !matches!(self.endpoint_kind, EndpointKind::PollQuery) {
419423
info!(
420424
"[HTTP-SESSION] got request without session, url={}, headers={:?}",
@@ -443,13 +447,8 @@ impl<E> HTTPSessionEndpoint<E> {
443447
// log every request, which can be distinguished by `session_id = ''`
444448
login_history.disable_write = true;
445449
}
446-
s.try_refresh_state(
447-
session.get_current_tenant(),
448-
&user_name,
449-
req.cookie(),
450-
is_worksheet,
451-
)
452-
.await?;
450+
s.try_refresh_state(session.get_current_tenant(), &user_name, req.cookie())
451+
.await?;
453452
}
454453

455454
let session = session_manager.register_session(session)?;

src/query/service/src/servers/http/middleware/session_header.rs

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,20 @@ fn make_cookie(name: impl Into<String>, value: impl Into<String>) -> Cookie {
4848
cookie
4949
}
5050

51-
// migrating from cookie to custom header:
52-
// request:
53-
// try read in order 1. custom header 2. cookie
54-
// response:
55-
// always custom header
56-
// cookie only if enabled
51+
#[derive(Clone, Eq, PartialEq)]
52+
pub enum ClientSessionType {
53+
// currently use in drivers
54+
Cookie,
55+
// to be used when cookie is inconvenient
56+
CustomHeader,
57+
// for now used only for worksheet
58+
IDOnly,
59+
}
60+
5761
#[derive(Clone)]
5862
pub struct ClientSession {
5963
pub header: ClientSessionHeader,
60-
61-
pub use_cookie: bool,
64+
pub typ: ClientSessionType,
6265
pub is_new_session: bool,
6366
pub refreshed: bool,
6467
}
@@ -90,24 +93,42 @@ impl ClientSession {
9093
}
9194
}
9295

93-
fn new_session(use_cookie: bool) -> Self {
96+
pub fn try_decode_for_worksheet(req: &Request) -> Option<Self> {
97+
if let Some(v) = req.headers().get(HEADER_SESSION_ID) {
98+
let id = v.to_str().unwrap().to_string().trim().to_owned();
99+
if !id.is_empty() {
100+
return Some(ClientSession {
101+
header: ClientSessionHeader {
102+
id,
103+
last_refresh_time: SystemTime::now(),
104+
},
105+
typ: ClientSessionType::Cookie,
106+
is_new_session: false,
107+
refreshed: false,
108+
});
109+
}
110+
}
111+
None
112+
}
113+
114+
fn new_session(typ: ClientSessionType) -> Self {
94115
let id = Uuid::now_v7().to_string();
95116
info!("[HTTP-SESSION] Created new session with ID: {}", id);
96117
ClientSession {
97118
header: ClientSessionHeader {
98119
id,
99120
last_refresh_time: SystemTime::now(),
100121
},
101-
use_cookie,
122+
typ,
102123
is_new_session: true,
103124
refreshed: false,
104125
}
105126
}
106127

107-
fn old_session(use_cookie: bool, header: ClientSessionHeader) -> Self {
128+
fn old_session(typ: ClientSessionType, header: ClientSessionHeader) -> Self {
108129
ClientSession {
109130
header,
110-
use_cookie,
131+
typ,
111132
is_new_session: false,
112133
refreshed: false,
113134
}
@@ -119,14 +140,16 @@ impl ClientSession {
119140
) -> Result<Option<ClientSession>, String> {
120141
if caps.session_header {
121142
if let Some(v) = headers.get(HEADER_SESSION) {
122-
caps.session_header = true;
123143
let v = v.to_str().unwrap().to_string().trim().to_owned();
124144
if !v.is_empty() {
125145
let header = decode_json_header(HEADER_SESSION, &v)?;
126-
return Ok(Some(Self::old_session(false, header)));
146+
return Ok(Some(Self::old_session(
147+
ClientSessionType::CustomHeader,
148+
header,
149+
)));
127150
};
128151
}
129-
Ok(Some(Self::new_session(false)))
152+
Ok(Some(Self::new_session(ClientSessionType::CustomHeader)))
130153
} else {
131154
Ok(None)
132155
}
@@ -159,9 +182,9 @@ impl ClientSession {
159182
id,
160183
last_refresh_time: last_access_time,
161184
};
162-
Self::old_session(true, header)
185+
Self::old_session(ClientSessionType::Cookie, header)
163186
} else {
164-
let s = Self::new_session(true);
187+
let s = Self::new_session(ClientSessionType::Cookie);
165188
cookie.add(make_cookie(COOKIE_SESSION_ID, &s.header.id));
166189
cookie.add(make_cookie(
167190
COOKIE_LAST_REFRESH_TIME,
@@ -180,13 +203,13 @@ impl ClientSession {
180203
tenant: Tenant,
181204
user_name: &str,
182205
cookie: &CookieJar,
183-
is_worksheet: bool,
184206
) -> databend_common_exception::Result<()> {
185207
let client_session_mgr = ClientSessionManager::instance();
186208
match self.header.last_refresh_time.elapsed() {
187209
Ok(elapsed) => {
188-
// worksheet
189-
if is_worksheet || elapsed > client_session_mgr.min_refresh_interval {
210+
if ClientSessionType::IDOnly == self.typ
211+
|| elapsed > client_session_mgr.min_refresh_interval
212+
{
190213
if client_session_mgr.refresh_in_memory_states(&self.header.id, user_name) {
191214
client_session_mgr
192215
.refresh_session_handle(tenant, user_name.to_string(), &self.header.id)
@@ -198,7 +221,7 @@ impl ClientSession {
198221
);
199222
}
200223
self.refreshed = true;
201-
if self.use_cookie {
224+
if ClientSessionType::Cookie == self.typ {
202225
cookie.add(make_cookie(
203226
COOKIE_LAST_REFRESH_TIME,
204227
unix_ts().as_secs().to_string().as_str(),
@@ -218,13 +241,15 @@ impl ClientSession {
218241

219242
pub fn on_response(&self, resp: &mut Response) {
220243
let mut header = self.header.clone();
221-
if self.refreshed {
222-
header.last_refresh_time = SystemTime::now();
223-
}
224244
resp.headers_mut()
225245
.insert(HEADER_SESSION_ID, header.id.parse().unwrap());
226-
resp.headers_mut()
227-
.insert(HEADER_SESSION, header.encode().parse().unwrap());
246+
if ClientSessionType::CustomHeader == self.typ {
247+
if self.refreshed {
248+
header.last_refresh_time = SystemTime::now();
249+
}
250+
resp.headers_mut()
251+
.insert(HEADER_SESSION, header.encode().parse().unwrap());
252+
}
228253
}
229254
}
230255

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,9 @@ where D: Deserializer<'de> {
307307
let json_string: Option<String> = Option::deserialize(deserializer)?;
308308
match json_string {
309309
Some(s) => {
310+
if s.is_empty() {
311+
return Ok(None);
312+
}
310313
let complex_value = serde_json::from_str(&s).map_err(serde::de::Error::custom)?;
311314
Ok(Some(complex_value))
312315
}
@@ -365,13 +368,19 @@ impl HttpSessionConf {
365368
let query_id = &http_ctx.query_id;
366369
let http_query_manager = HttpQueryManager::instance();
367370
if let Some(catalog) = &self.catalog {
368-
session.set_current_catalog(catalog.clone());
371+
if !catalog.is_empty() {
372+
session.set_current_catalog(catalog.clone());
373+
}
369374
}
370375
if let Some(db) = &self.database {
371-
session.set_current_database(db.clone());
376+
if !db.is_empty() {
377+
session.set_current_database(db.clone());
378+
}
372379
}
373380
if let Some(role) = &self.role {
374-
session.set_current_role_checked(role).await?;
381+
if !role.is_empty() {
382+
session.set_current_role_checked(role).await?;
383+
}
375384
}
376385
// if the secondary_roles are None (which is the common case), it will not send any rpc on validation.
377386
session

tests/suites/1_stateful/09_http_handler/09_0007_session.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,51 @@ def test_no_session():
159159
resp = resp.json()
160160
assert len(resp["data"]) == 2, resp
161161

162+
163+
HEADER_SESSION_ID = "X-DATABEND-SESSION-ID"
164+
HEADER_SESSION_ID_V = "101010"
165+
166+
def do_query_from_worksheet(client, sql, sid=HEADER_SESSION_ID_V):
167+
payload = {"sql": sql, "pagination": {"max_rows_per_page": 2, "wait_time_secs": 10}}
168+
resp = client.post(
169+
query_url,
170+
auth=auth,
171+
headers={"Content-Type": "application/json",
172+
"USER-AGENT": "worksheet",
173+
HEADER_SESSION_ID: sid,
174+
},
175+
json=payload,
176+
)
177+
return resp.json()
178+
179+
def test_worksheet_session():
180+
client = requests.session()
181+
resp = do_query_from_worksheet(client, "select * from numbers(100)")
182+
next_uri = resp.get("next_uri")
183+
184+
resp = client.get(
185+
f"http://localhost:8000/{next_uri}",
186+
auth=auth,
187+
headers={
188+
"USER-AGENT": "worksheet",
189+
HEADER_SESSION_ID: HEADER_SESSION_ID_V
190+
},
191+
)
192+
resp = resp.json()
193+
assert len(resp["data"]) == 2, resp
194+
195+
resp = do_query_from_worksheet(client, "create or replace temp table t09_0007(a int)")
196+
assert resp["state"] == "Succeeded", resp
197+
resp = do_query_from_worksheet(client, "insert into t09_0007 values (1)")
198+
assert resp["state"] == "Succeeded", resp
199+
resp = do_query_from_worksheet(client, "select * from t09_0007")
200+
assert resp["data"] == [["1"]], resp
201+
202+
162203
def main():
163204
test_no_session()
164205
test_session()
206+
test_worksheet_session()
165207

166208

167209
if __name__ == "__main__":

0 commit comments

Comments
 (0)