Skip to content

Commit dbddeb1

Browse files
mmahroussfda-odoo
authored andcommitted
[IMP] remove read thread
1 parent 2fb0a76 commit dbddeb1

File tree

2 files changed

+50
-230
lines changed

2 files changed

+50
-230
lines changed

server/src/server.rs

Lines changed: 38 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ use serde_json::json;
99
use nix;
1010
use tracing::{error, info, warn};
1111

12-
use crate::{constants::{DEBUG_THREADS, EXTENSION_VERSION}, core::{file_mgr::FileMgr, odoo::SyncOdoo}, threads::{delayed_changes_process_thread, message_processor_thread_main, message_processor_thread_read, DelayedProcessingMessage}, S};
12+
use crate::{constants::{DEBUG_THREADS, EXTENSION_VERSION}, core::{file_mgr::FileMgr, odoo::SyncOdoo}, threads::{delayed_changes_process_thread, message_processor_thread_main, DelayedProcessingMessage}, S};
1313

14-
const THREAD_MAIN_COUNT: u16 = 1;
15-
const THREAD_READ_COUNT: u16 = 1;
1614

1715
/**
1816
* Server handle connection between the client and the extension.
@@ -22,15 +20,11 @@ const THREAD_READ_COUNT: u16 = 1;
2220
pub struct Server {
2321
pub connection: Option<Connection>,
2422
client_process_id: u32,
25-
io_threads: IoThreads,
2623
receivers_w_to_s: Vec<Receiver<Message>>,
2724
msg_id: i32,
28-
id_list: HashMap<RequestId, u16>, //map each request to its thread. firsts ids for main thread, nexts for read ones, last for delayed_process thread
29-
threads: Vec<JoinHandle<()>>,
30-
senders_s_to_main: Vec<Sender<Message>>, // specific channel to threads, to handle responses
31-
sender_s_to_main: Sender<Message>, //unique channel server to all main threads. Will handle new request message
32-
senders_s_to_read: Vec<Sender<Message>>, // specific channel to threads, to handle responses
33-
sender_s_to_read: Sender<Message>, //unique channel server to all read threads
25+
main_thread: JoinHandle<()>,
26+
res_sender_s_to_main: Sender<Message>, // specific channel to threads, to handle responses (main -> s -> client and back)
27+
req_sender_s_to_main: Sender<Message>, //channel server to main threads. Will handle new request message (client -> s -> main and back)
3428
delayed_process_thread: JoinHandle<()>,
3529
sender_to_delayed_process: Sender<DelayedProcessingMessage>, //unique channel to delayed process thread
3630
sync_odoo: Arc<Mutex<SyncOdoo>>,
@@ -75,47 +69,24 @@ impl Server {
7569
Server::init(conn, io_threads)
7670
}
7771

78-
fn init(conn: Connection, io_threads: IoThreads) -> Self {
79-
let mut threads = vec![];
72+
fn init(conn: Connection, _io_threads: IoThreads) -> Self {
8073
let sync_odoo = Arc::new(Mutex::new(SyncOdoo::new()));
8174
let interrupt_rebuild_boolean = sync_odoo.lock().unwrap().interrupt_rebuild.clone();
8275
let terminate_rebuild_boolean = sync_odoo.lock().unwrap().terminate_rebuild.clone();
8376
let mut receivers_w_to_s = vec![];
84-
let mut senders_s_to_main = vec![];
8577
let (sender_to_delayed_process, receiver_delayed_process) = crossbeam_channel::unbounded();
86-
let (generic_sender_s_to_main, generic_receiver_s_to_main) = crossbeam_channel::unbounded(); //unique channel to dispatch to any ready main thread
87-
for _ in 0..THREAD_MAIN_COUNT {
88-
let (sender_s_to_main, receiver_s_to_main) = crossbeam_channel::unbounded();
89-
let (sender_main_to_s, receiver_main_to_s) = crossbeam_channel::unbounded();
90-
senders_s_to_main.push(sender_s_to_main);
91-
receivers_w_to_s.push(receiver_main_to_s);
92-
93-
threads.push({
94-
let sync_odoo = sync_odoo.clone();
95-
let generic_receiver_s_to_main = generic_receiver_s_to_main.clone();
96-
let sender_to_delayed_process = sender_to_delayed_process.clone();
97-
std::thread::spawn(move || {
98-
message_processor_thread_main(sync_odoo, generic_receiver_s_to_main, sender_main_to_s.clone(), receiver_s_to_main.clone(), sender_to_delayed_process);
99-
})
100-
});
101-
}
102-
103-
let mut senders_s_to_read = vec![];
104-
let (generic_sender_s_to_read, generic_receiver_s_to_read) = crossbeam_channel::unbounded(); //unique channel to dispatch to any ready read thread
105-
for _ in 0..THREAD_READ_COUNT {
106-
let (sender_s_to_read, receiver_s_to_read) = crossbeam_channel::unbounded();
107-
let (sender_read_to_s, receiver_read_to_s) = crossbeam_channel::unbounded();
108-
senders_s_to_read.push(sender_s_to_read);
109-
receivers_w_to_s.push(receiver_read_to_s);
110-
threads.push({
111-
let sync_odoo = sync_odoo.clone();
112-
let generic_receiver_s_to_read = generic_receiver_s_to_read.clone();
113-
let sender_to_delayed_process = sender_to_delayed_process.clone();
114-
std::thread::spawn(move || {
115-
message_processor_thread_read(sync_odoo, generic_receiver_s_to_read.clone(), sender_read_to_s.clone(), receiver_s_to_read.clone(), sender_to_delayed_process);
116-
})
117-
});
118-
}
78+
let (req_sender_s_to_main, generic_receiver_s_to_main) = crossbeam_channel::unbounded(); //unique channel to dispatch to any ready main thread
79+
let (res_sender_s_to_main, receiver_s_to_main) = crossbeam_channel::unbounded();
80+
let (sender_main_to_s, receiver_main_to_s) = crossbeam_channel::unbounded();
81+
receivers_w_to_s.push(receiver_main_to_s);
82+
83+
let main_thread = {
84+
let sync_odoo = sync_odoo.clone();
85+
let sender_to_delayed_process = sender_to_delayed_process.clone();
86+
std::thread::spawn(move || {
87+
message_processor_thread_main(sync_odoo, generic_receiver_s_to_main, sender_main_to_s.clone(), receiver_s_to_main.clone(), sender_to_delayed_process);
88+
})
89+
};
11990

12091
let (_, receiver_s_to_delayed) = crossbeam_channel::unbounded();
12192
let (sender_delayed_to_s, receiver_delayed_to_s) = crossbeam_channel::unbounded();
@@ -125,27 +96,14 @@ impl Server {
12596
let delayed_process_thread = std::thread::spawn(move || {
12697
delayed_changes_process_thread(sender_delayed_to_s, receiver_s_to_delayed, receiver_delayed_process, so, delayed_process_sender_to_delayed_process)
12798
});
128-
129-
// let (sender_to_server, receiver_to_server) = crossbeam_channel::unbounded();
130-
// let (sender_from_server_reactive, receiver_from_server) = crossbeam_channel::unbounded();
131-
// server.add_receiver(receiver_to_server.clone());
132-
// for i in 0..THREAD_REACTIVE_COUNT {
133-
// threads.push(std::thread::spawn(move || {
134-
// message_processor_thread_reactive(sender_to_server.clone(), receiver_from_server.clone());
135-
// }));
136-
// }
13799
Self {
138100
connection: Some(conn),
139101
client_process_id: 0,
140-
io_threads: io_threads,
141-
id_list: HashMap::new(),
142102
msg_id: 0,
143103
receivers_w_to_s: receivers_w_to_s,
144-
threads: threads,
145-
senders_s_to_main: senders_s_to_main,
146-
sender_s_to_main: generic_sender_s_to_main,
147-
senders_s_to_read: senders_s_to_read,
148-
sender_s_to_read: generic_sender_s_to_read,
104+
main_thread,
105+
req_sender_s_to_main,
106+
res_sender_s_to_main,
149107
sender_to_delayed_process: sender_to_delayed_process,
150108
delayed_process_thread,
151109
sync_odoo: sync_odoo,
@@ -207,8 +165,8 @@ impl Server {
207165
..CompletionOptions::default()
208166
}),
209167
references_provider: Some(OneOf::Right(ReferencesOptions {
210-
work_done_progress_options: WorkDoneProgressOptions {
211-
work_done_progress: Some(false)
168+
work_done_progress_options: WorkDoneProgressOptions {
169+
work_done_progress: Some(false)
212170
}
213171
})),
214172
document_symbol_provider: Some(OneOf::Right(DocumentSymbolOptions{
@@ -272,8 +230,8 @@ impl Server {
272230
})
273231
}));
274232
info!("End of connection initalization.");
275-
self.sender_s_to_main.send(Message::Notification(lsp_server::Notification { method: S!("custom/server/register_capabilities"), params: serde_json::Value::Null })).unwrap();
276-
self.sender_s_to_main.send(Message::Notification(lsp_server::Notification { method: S!("custom/server/init"), params: serde_json::Value::Null })).unwrap();
233+
self.req_sender_s_to_main.send(Message::Notification(lsp_server::Notification { method: S!("custom/server/register_capabilities"), params: serde_json::Value::Null })).unwrap();
234+
self.req_sender_s_to_main.send(Message::Notification(lsp_server::Notification { method: S!("custom/server/init"), params: serde_json::Value::Null })).unwrap();
277235
Ok(())
278236
}
279237

@@ -283,14 +241,8 @@ impl Server {
283241
method: Shutdown::METHOD.to_string(),
284242
params: serde_json::Value::Null,
285243
});
286-
for specific_sender in self.senders_s_to_main.iter() {
287-
self.sender_s_to_main.send(shutdown_notification.clone()).unwrap(); //sent as notification as we already handled the request for the client
288-
specific_sender.send(shutdown_notification.clone()).unwrap(); //send to specific channels too to close pending requests
289-
}
290-
for specific_sender in self.senders_s_to_read.iter() {
291-
self.sender_s_to_read.send(shutdown_notification.clone()).unwrap(); //sent as notification as we already handled the request for the client
292-
specific_sender.send(shutdown_notification.clone()).unwrap(); //send to specific channels too to close pending requests
293-
}
244+
self.req_sender_s_to_main.send(shutdown_notification.clone()).unwrap();
245+
self.res_sender_s_to_main.send(shutdown_notification.clone()).unwrap();
294246
info!(message);
295247
}
296248

@@ -360,13 +312,12 @@ impl Server {
360312
break;
361313
}
362314
}
363-
self.dispatch(msg);
315+
self.forward_message(msg);
364316
} else { // comes from threads
365317
match msg {
366318
Message::Request(mut r) => {
367319
r.id = RequestId::from(self.msg_id);
368320
self.msg_id += 1;
369-
self.id_list.insert(r.id.clone(), index as u16);
370321
self.connection.as_ref().unwrap().sender.send(Message::Request(r)).unwrap();
371322
},
372323
Message::Notification(n) => {
@@ -391,77 +342,40 @@ impl Server {
391342
if let Some(pid_join_handle) = pid_thread {
392343
pid_join_handle.join().unwrap();
393344
}
394-
for thread in self.threads {
395-
thread.join().unwrap();
396-
}
345+
self.main_thread.join().unwrap();
397346
let _ = self.sender_to_delayed_process.send(DelayedProcessingMessage::EXIT);
398347
self.delayed_process_thread.join().unwrap();
399348
exit_no_error_code
400349
}
401350

402351
/* address a message to the right thread. */
403-
fn dispatch(&mut self, msg: Message) {
352+
fn forward_message(&mut self, msg: Message) {
404353
match msg {
405354
Message::Request(r) => {
406355
match r.method.as_str() {
407-
HoverRequest::METHOD | GotoDefinition::METHOD | References::METHOD => {
356+
HoverRequest::METHOD | GotoDefinition::METHOD | References::METHOD | DocumentSymbolRequest::METHOD=> {
408357
self.interrupt_rebuild_boolean.store(true, std::sync::atomic::Ordering::SeqCst);
409358
if DEBUG_THREADS {
410-
info!("Sending request to read thread : {} - {}", r.method, r.id);
359+
info!("Sending request to main thread : {} - {}", r.method, r.id);
411360
}
412-
self.sender_s_to_read.send(Message::Request(r)).unwrap();
361+
self.req_sender_s_to_main.send(Message::Request(r)).unwrap();
413362
},
414363
Completion::METHOD => {
415364
self.interrupt_rebuild_boolean.store(true, std::sync::atomic::Ordering::SeqCst);
416365
if DEBUG_THREADS {
417366
info!("Sending request to main thread : {} - {}", r.method, r.id);
418367
}
419-
self.sender_s_to_main.send(Message::Request(r)).unwrap();
368+
self.req_sender_s_to_main.send(Message::Request(r)).unwrap();
420369
},
421-
DocumentSymbolRequest::METHOD => {
422-
if DEBUG_THREADS {
423-
info!("Sending request to read thread : {} - {}", r.method, r.id);
424-
}
425-
self.sender_s_to_read.send(Message::Request(r)).unwrap();
426-
}
427370
ResolveCompletionItem::METHOD => {
428371
info!("Got ignored CompletionItem/resolve")
429372
}
430373
_ => {panic!("Not handled Request Id: {}", r.method)}
431374
}
432375
},
433376
Message::Response(r) => {
434-
let thread_id = self.id_list.get(&r.id);
435-
if let Some(thread_id) = thread_id {
436-
if *thread_id == 0_u16 {
437-
panic!("thread_id can't be equal to 0. Client can't respond to itself");
438-
} else {
439-
let mut t_id = thread_id - 1;
440-
if t_id < THREAD_MAIN_COUNT {
441-
if DEBUG_THREADS {
442-
info!("Sending response to main thread : {}", r.id);
443-
}
444-
self.senders_s_to_main.get(t_id as usize).unwrap().send(Message::Response(r)).unwrap();
445-
return;
446-
}
447-
t_id -= THREAD_MAIN_COUNT;
448-
if t_id < THREAD_READ_COUNT {
449-
if DEBUG_THREADS {
450-
info!("Sending response to read thread : {}", r.id);
451-
}
452-
self.senders_s_to_read.get(t_id as usize).unwrap().send(Message::Response(r)).unwrap();
453-
return;
454-
}
455-
// t_id -= THREAD_READ_COUNT;
456-
// if t_id < THREAD_REACTIVE_COUNT {
457-
// self.senders_s_to_react.get(t_id as usize).unwrap().send(msg);
458-
// return;
459-
// }
460-
panic!("invalid thread id");
461-
}
462-
} else {
463-
panic!("Got a response for an unknown request: {:?}", r);
464-
}
377+
info!("Sending response to main thread : {}", r.id);
378+
self.res_sender_s_to_main.send(Message::Response(r)).unwrap();
465379
},
466380
Message::Notification(n) => {
467381
match n.method.as_str() {
@@ -472,15 +386,10 @@ impl Server {
472386
info!("Sending notification to main thread : {}", n.method);
473387
}
474388
self.interrupt_rebuild_boolean.store(true, std::sync::atomic::Ordering::SeqCst);
475-
self.sender_s_to_main.send(Message::Notification(n)).unwrap();
476-
}
477-
_ => {
478-
if n.method.starts_with("$/") {
479-
warn!("Not handled message id: {}", n.method);
480-
} else {
481-
error!("Not handled Notification Id: {}", n.method)
482-
}
389+
self.req_sender_s_to_main.send(Message::Notification(n)).unwrap();
483390
}
391+
method if method.starts_with("$/") => warn!("Not handled message id: {}", n.method),
392+
method => error!("Not handled Notification Id: {}", method),
484393
}
485394
}
486395
}

0 commit comments

Comments
 (0)