Skip to content

Commit 4ef5f06

Browse files
author
Michael Taylor
committed
0.1.10
1 parent bfb45b9 commit 4ef5f06

2 files changed

Lines changed: 39 additions & 34 deletions

File tree

src/peer_pool.rs

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,16 @@ pub struct NewPeakHeightEvent {
2525
pub peer_id: String,
2626
}
2727

28+
struct PeerWorkerParams {
29+
peer_connection: PeerConnection,
30+
peer_id: String,
31+
host: String,
32+
port: u16,
33+
disconnected_callback: Arc<RwLock<Option<PeerDisconnectedCallback>>>,
34+
inner: Arc<RwLock<ChiaPeerPoolInner>>,
35+
new_peak_callback: Arc<RwLock<Option<NewPeakHeightCallback>>>,
36+
}
37+
2838
pub struct ChiaPeerPool {
2939
inner: Arc<RwLock<ChiaPeerPoolInner>>,
3040
request_sender: mpsc::Sender<PoolRequest>,
@@ -133,13 +143,15 @@ impl ChiaPeerPool {
133143
tokio::spawn(async move {
134144
Self::peer_worker(
135145
worker_rx,
136-
peer_conn_clone,
137-
peer_id_clone,
138-
host_clone,
139-
port,
140-
disconnected_callback,
141-
inner_clone,
142-
new_peak_callback,
146+
PeerWorkerParams {
147+
peer_connection: peer_conn_clone,
148+
peer_id: peer_id_clone,
149+
host: host_clone,
150+
port,
151+
disconnected_callback,
152+
inner: inner_clone,
153+
new_peak_callback,
154+
},
143155
)
144156
.await;
145157
});
@@ -337,35 +349,29 @@ impl ChiaPeerPool {
337349

338350
async fn peer_worker(
339351
mut receiver: mpsc::Receiver<WorkerRequest>,
340-
peer_connection: PeerConnection,
341-
peer_id: String,
342-
host: String,
343-
port: u16,
344-
disconnected_callback: Arc<RwLock<Option<PeerDisconnectedCallback>>>,
345-
inner: Arc<RwLock<ChiaPeerPoolInner>>,
346-
new_peak_callback: Arc<RwLock<Option<NewPeakHeightCallback>>>,
352+
params: PeerWorkerParams,
347353
) {
348-
info!("Starting worker for peer {}", peer_id);
354+
info!("Starting worker for peer {}", params.peer_id);
349355

350356
while let Some(request) = receiver.recv().await {
351357
match request {
352358
WorkerRequest::GetBlock {
353359
height,
354360
response_tx,
355361
} => {
356-
debug!("Worker {} fetching block at height {}", peer_id, height);
362+
debug!("Worker {} fetching block at height {}", params.peer_id, height);
357363

358364
// Create a new connection for this request
359-
match peer_connection.connect().await {
365+
match params.peer_connection.connect().await {
360366
Ok(mut ws_stream) => {
361367
// Perform handshake
362-
if let Err(e) = peer_connection.handshake(&mut ws_stream).await {
363-
error!("Handshake failed for {}: {}", peer_id, e);
368+
if let Err(e) = params.peer_connection.handshake(&mut ws_stream).await {
369+
error!("Handshake failed for {}: {}", params.peer_id, e);
364370
let _ = response_tx.send(Err(e));
365371
continue;
366372
}
367373

368-
match peer_connection
374+
match params.peer_connection
369375
.request_block_by_height(height, &mut ws_stream)
370376
.await
371377
{
@@ -374,12 +380,12 @@ impl ChiaPeerPool {
374380
Ok(parsed_block) => {
375381
info!(
376382
"Worker {} parsed block at height {}",
377-
peer_id, parsed_block.height
383+
params.peer_id, parsed_block.height
378384
);
379385

380386
// Update peak height for this peer if this is higher than what we've seen
381-
let mut guard = inner.write().await;
382-
if let Some(peer_info) = guard.peers.get_mut(&peer_id) {
387+
let mut guard = params.inner.write().await;
388+
if let Some(peer_info) = guard.peers.get_mut(&params.peer_id) {
383389
match peer_info.peak_height {
384390
Some(current_peak) => {
385391
if parsed_block.height > current_peak {
@@ -409,12 +415,12 @@ impl ChiaPeerPool {
409415

410416
// Emit new peak event
411417
if let Some(callback) =
412-
&*new_peak_callback.read().await
418+
&*params.new_peak_callback.read().await
413419
{
414420
callback(NewPeakHeightEvent {
415421
old_peak,
416422
new_peak: parsed_block.height,
417-
peer_id: peer_id.clone(),
423+
peer_id: params.peer_id.clone(),
418424
});
419425
}
420426
} else {
@@ -431,12 +437,12 @@ impl ChiaPeerPool {
431437

432438
// Emit new peak event
433439
if let Some(callback) =
434-
&*new_peak_callback.read().await
440+
&*params.new_peak_callback.read().await
435441
{
436442
callback(NewPeakHeightEvent {
437443
old_peak,
438444
new_peak: parsed_block.height,
439-
peer_id: peer_id.clone(),
445+
peer_id: params.peer_id.clone(),
440446
});
441447
}
442448
}
@@ -458,24 +464,24 @@ impl ChiaPeerPool {
458464
}
459465
}
460466
Err(e) => {
461-
error!("Connection failed for {}: {}", peer_id, e);
467+
error!("Connection failed for {}: {}", params.peer_id, e);
462468
let _ = response_tx.send(Err(e));
463469
}
464470
}
465471
}
466472
WorkerRequest::Shutdown => {
467-
info!("Shutting down worker for peer {}", peer_id);
473+
info!("Shutting down worker for peer {}", params.peer_id);
468474
break;
469475
}
470476
}
471477
}
472478

473479
// Emit disconnected event when worker shuts down
474-
if let Some(callback) = &*disconnected_callback.read().await {
480+
if let Some(callback) = &*params.disconnected_callback.read().await {
475481
callback(PeerDisconnectedEvent {
476-
peer_id,
477-
host,
478-
port: port as u32,
482+
peer_id: params.peer_id,
483+
host: params.host,
484+
port: params.port as u32,
479485
message: Some("Worker shutdown".to_string()),
480486
});
481487
}

src/peer_pool_napi.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ impl ChiaPeerPool {
128128
#[napi]
129129
pub fn on(&self, event: String, callback: JsFunction) -> Result<()> {
130130
let rt = tokio::runtime::Handle::current();
131-
let listeners = self.listeners.clone();
132131

133132
match event.as_str() {
134133
"peerConnected" => {

0 commit comments

Comments
 (0)