Skip to content

Commit 9c0b0b5

Browse files
committed
add aggregated_translator_triggers_fallback_on_close_channel_message
1 parent 1f924ad commit 9c0b0b5

File tree

1 file changed

+156
-0
lines changed

1 file changed

+156
-0
lines changed

integration-tests/tests/translator_integration.rs

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,3 +1263,159 @@ async fn non_aggregated_translator_correctly_deals_with_close_channel_message()
12631263
)
12641264
.await;
12651265
}
1266+
1267+
/// This test launches a tProxy in aggregated mode and leverages two MockUpstreams to test the
1268+
/// correct behavior of handling CloseChannel messages.
1269+
///
1270+
/// We first send a CloseChannel message to a single channel, and assert that no shares are
1271+
/// submitted from it. Then we send a CloseChannel message to the group channel, and assert that no
1272+
/// shares are submitted from any channel.
1273+
#[tokio::test]
1274+
async fn aggregated_translator_triggers_fallback_on_close_channel_message() {
1275+
start_tracing();
1276+
1277+
// first upstream server mock
1278+
let mock_upstream_addr_a = get_available_address();
1279+
let mock_upstream_a = MockUpstream::new(mock_upstream_addr_a);
1280+
let send_to_tproxy_a = mock_upstream_a.start().await;
1281+
let (sniffer_a, sniffer_addr_a) = start_sniffer("", mock_upstream_addr_a, false, vec![], None);
1282+
1283+
// fallback upstream server mock
1284+
let mock_upstream_addr_b = get_available_address();
1285+
let mock_upstream_b = MockUpstream::new(mock_upstream_addr_b);
1286+
let _send_to_tproxy_b = mock_upstream_b.start().await;
1287+
let (sniffer_b, sniffer_addr_b) = start_sniffer("", mock_upstream_addr_b, false, vec![], None);
1288+
1289+
let (_tproxy, tproxy_addr) = start_sv2_translator(
1290+
&[sniffer_addr_a, sniffer_addr_b],
1291+
true,
1292+
vec![],
1293+
vec![],
1294+
None,
1295+
)
1296+
.await;
1297+
1298+
sniffer_a
1299+
.wait_for_message_type_and_clean_queue(
1300+
MessageDirection::ToUpstream,
1301+
MESSAGE_TYPE_SETUP_CONNECTION,
1302+
)
1303+
.await;
1304+
1305+
let setup_connection_success = AnyMessage::Common(CommonMessages::SetupConnectionSuccess(
1306+
SetupConnectionSuccess {
1307+
used_version: 2,
1308+
flags: 0,
1309+
},
1310+
));
1311+
send_to_tproxy_a
1312+
.send(setup_connection_success)
1313+
.await
1314+
.unwrap();
1315+
1316+
// we need to keep references to each minerd
1317+
// otherwise they would be dropped
1318+
let mut minerd_vec = Vec::new();
1319+
1320+
let (minerd_process, _minerd_addr) = start_minerd(tproxy_addr, None, None, false).await;
1321+
minerd_vec.push(minerd_process);
1322+
1323+
sniffer_a
1324+
.wait_for_message_type(
1325+
MessageDirection::ToUpstream,
1326+
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
1327+
)
1328+
.await;
1329+
let open_extended_mining_channel: OpenExtendedMiningChannel = loop {
1330+
match sniffer_a.next_message_from_downstream() {
1331+
Some((_, AnyMessage::Mining(parsers_sv2::Mining::OpenExtendedMiningChannel(msg)))) => {
1332+
break msg;
1333+
}
1334+
_ => continue,
1335+
};
1336+
};
1337+
1338+
let open_extended_mining_channel_success = AnyMessage::Mining(
1339+
parsers_sv2::Mining::OpenExtendedMiningChannelSuccess(OpenExtendedMiningChannelSuccess {
1340+
request_id: open_extended_mining_channel.request_id,
1341+
channel_id: 0,
1342+
target: hex::decode("0000137c578190689425e3ecf8449a1af39db0aed305d9206f45ac32fe8330fc")
1343+
.unwrap()
1344+
.try_into()
1345+
.unwrap(),
1346+
// full extranonce has a total of 12 bytes
1347+
extranonce_size: 8,
1348+
extranonce_prefix: vec![0x00, 0x01, 0x00, 0x00].try_into().unwrap(),
1349+
group_channel_id: 100,
1350+
}),
1351+
);
1352+
send_to_tproxy_a
1353+
.send(open_extended_mining_channel_success)
1354+
.await
1355+
.unwrap();
1356+
1357+
sniffer_a
1358+
.wait_for_message_type_and_clean_queue(
1359+
MessageDirection::ToDownstream,
1360+
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCESS,
1361+
)
1362+
.await;
1363+
1364+
let new_extended_mining_job = AnyMessage::Mining(parsers_sv2::Mining::NewExtendedMiningJob(NewExtendedMiningJob {
1365+
channel_id: 0,
1366+
job_id: 1,
1367+
min_ntime: Sv2Option::new(None),
1368+
version: 0x20000000,
1369+
version_rolling_allowed: true,
1370+
merkle_path: Seq0255::new(vec![]).unwrap(),
1371+
// scriptSig for a total of 8 bytes of extranonce
1372+
coinbase_tx_prefix: hex::decode("02000000010000000000000000000000000000000000000000000000000000000000000000ffffffff265200162f5374726174756d2056322053524920506f6f6c2f2f08").unwrap().try_into().unwrap(),
1373+
coinbase_tx_suffix: hex::decode("feffffff0200f2052a01000000160014ebe1b7dcc293ccaa0ee743a86f89df8258c208fc0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf901000000").unwrap().try_into().unwrap(),
1374+
}));
1375+
1376+
send_to_tproxy_a
1377+
.send(new_extended_mining_job)
1378+
.await
1379+
.unwrap();
1380+
sniffer_a
1381+
.wait_for_message_type_and_clean_queue(
1382+
MessageDirection::ToDownstream,
1383+
MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
1384+
)
1385+
.await;
1386+
1387+
let set_new_prev_hash =
1388+
AnyMessage::Mining(parsers_sv2::Mining::SetNewPrevHash(SetNewPrevHash {
1389+
channel_id: 0,
1390+
job_id: 1,
1391+
prev_hash: hex::decode(
1392+
"3ab7089cd2cd30f133552cfde82c4cb239cd3c2310306f9d825e088a1772cc39",
1393+
)
1394+
.unwrap()
1395+
.try_into()
1396+
.unwrap(),
1397+
min_ntime: 1766782170,
1398+
nbits: 0x207fffff,
1399+
}));
1400+
1401+
send_to_tproxy_a.send(set_new_prev_hash).await.unwrap();
1402+
sniffer_a
1403+
.wait_for_message_type_and_clean_queue(
1404+
MessageDirection::ToDownstream,
1405+
MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH,
1406+
)
1407+
.await;
1408+
1409+
// up until now, we have done the usual channel initialization process
1410+
// now, lets send a CloseChannel message for the channel
1411+
let close_channel = AnyMessage::Mining(parsers_sv2::Mining::CloseChannel(CloseChannel {
1412+
channel_id: 0,
1413+
reason_code: "".to_string().try_into().unwrap(),
1414+
}));
1415+
send_to_tproxy_a.send(close_channel).await.unwrap();
1416+
1417+
// this should trigger fallback
1418+
sniffer_b
1419+
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
1420+
.await;
1421+
}

0 commit comments

Comments
 (0)