Skip to content

Commit 37a3478

Browse files
committed
Add cast delivery admission support
1 parent f02ff3f commit 37a3478

21 files changed

Lines changed: 969 additions & 68 deletions

File tree

crates/cast-support/src/delivery.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::BTreeSet;
22

3-
use jacquard_core::{ByteCount, NodeId, RatioPermille};
3+
use jacquard_core::{BroadcastDomainId, ByteCount, NodeId, RatioPermille};
44
use serde::{Deserialize, Serialize};
55

66
use crate::{
@@ -81,6 +81,7 @@ pub struct CastDeliveryObjective {
8181
pub sender: NodeId,
8282
pub receivers: CastReceiverSet,
8383
pub group_id: Option<CastGroupId>,
84+
pub broadcast_domain_id: Option<BroadcastDomainId>,
8485
pub coverage: CastCoverageObjective,
8586
}
8687

@@ -92,6 +93,7 @@ impl CastDeliveryObjective {
9293
sender,
9394
receivers: CastReceiverSet::one(receiver),
9495
group_id: None,
96+
broadcast_domain_id: None,
9597
coverage: CastCoverageObjective::AllReceivers,
9698
}
9799
}
@@ -108,13 +110,15 @@ impl CastDeliveryObjective {
108110
sender,
109111
receivers: CastReceiverSet::new(receivers),
110112
group_id: Some(group_id),
113+
broadcast_domain_id: None,
111114
coverage,
112115
}
113116
}
114117

115118
#[must_use]
116-
pub fn broadcast(
119+
pub fn broadcast_in_domain(
117120
sender: NodeId,
121+
domain_id: BroadcastDomainId,
118122
receivers: impl IntoIterator<Item = NodeId>,
119123
coverage: CastCoverageObjective,
120124
) -> Self {
@@ -123,6 +127,7 @@ impl CastDeliveryObjective {
123127
sender,
124128
receivers: CastReceiverSet::new(receivers),
125129
group_id: None,
130+
broadcast_domain_id: Some(domain_id),
126131
coverage,
127132
}
128133
}
@@ -224,6 +229,7 @@ pub struct MulticastDeliverySupport {
224229
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
225230
pub struct BroadcastDeliverySupport {
226231
pub sender: NodeId,
232+
pub domain_id: BroadcastDomainId,
227233
pub receivers: Vec<ReceiverCoverageEvidence>,
228234
pub support: BroadcastSupportKind,
229235
pub confidence_permille: RatioPermille,
@@ -398,6 +404,7 @@ fn broadcast_delivery_support(
398404
}
399405
Some(BroadcastDeliverySupport {
400406
sender: evidence.sender,
407+
domain_id: objective.broadcast_domain_id?,
401408
receivers,
402409
support: evidence.support,
403410
confidence_permille: confidence,

crates/cast-support/src/multicast.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::{BTreeMap, BTreeSet};
22

3-
use jacquard_core::{ByteCount, NodeId, RatioPermille};
3+
use jacquard_core::{ByteCount, MulticastGroupId, NodeId, RatioPermille};
44
use serde::{Deserialize, Serialize};
55

66
use crate::{
@@ -13,7 +13,19 @@ use crate::{
1313
// proc-macro-scope: Multicast profile evidence shaping is plain helper logic.
1414

1515
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
16-
pub struct CastGroupId(pub Vec<u8>);
16+
pub struct CastGroupId(pub MulticastGroupId);
17+
18+
impl CastGroupId {
19+
#[must_use]
20+
pub const fn new(group_id: MulticastGroupId) -> Self {
21+
Self(group_id)
22+
}
23+
24+
#[must_use]
25+
pub const fn to_route_group_id(&self) -> MulticastGroupId {
26+
self.0
27+
}
28+
}
1729

1830
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1931
pub struct MulticastObservation {

crates/cast-support/tests/delivery.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use jacquard_cast_support::{
66
CastEvidencePolicy, CastGroupId, CastReceiverSet, MulticastObservation,
77
ReceiverCoverageObservation, UnicastObservation,
88
};
9-
use jacquard_core::{ByteCount, DurationMs, NodeId, OrderStamp, RatioPermille, Tick};
9+
use jacquard_core::{
10+
BroadcastDomainId, ByteCount, DurationMs, MulticastGroupId, NodeId, OrderStamp, RatioPermille,
11+
Tick,
12+
};
1013

1114
fn node(byte: u8) -> NodeId {
1215
NodeId([byte; 32])
@@ -62,8 +65,12 @@ fn unicast_observation(to: u8, confidence: u16, reverse: Option<u16>) -> Unicast
6265
}
6366
}
6467

65-
fn group(name: &[u8]) -> CastGroupId {
66-
CastGroupId(name.to_vec())
68+
fn group(byte: u8) -> CastGroupId {
69+
CastGroupId::new(MulticastGroupId([byte; 16]))
70+
}
71+
72+
fn domain(byte: u8) -> BroadcastDomainId {
73+
BroadcastDomainId([byte; 16])
6774
}
6875

6976
#[test]
@@ -106,7 +113,7 @@ fn multicast_delivery_respects_partial_coverage_policy() {
106113
let (evidence, _) = shape_multicast_evidence(
107114
[MulticastObservation {
108115
sender: node(1),
109-
group_id: group(b"team"),
116+
group_id: group(1),
110117
receivers: vec![receiver(2, 900), receiver(3, 800)],
111118
group_pressure_permille: RatioPermille(100),
112119
fanout_limit: 3,
@@ -117,7 +124,7 @@ fn multicast_delivery_respects_partial_coverage_policy() {
117124
);
118125
let objective = CastDeliveryObjective::multicast(
119126
node(1),
120-
group(b"team"),
127+
group(1),
121128
[node(2), node(4)],
122129
CastCoverageObjective::AnyReceiver,
123130
);
@@ -168,8 +175,12 @@ fn broadcast_delivery_can_reject_gateway_assisted_reverse_support() {
168175
],
169176
evidence_policy(),
170177
);
171-
let objective =
172-
CastDeliveryObjective::broadcast(node(1), [node(2)], CastCoverageObjective::AllReceivers);
178+
let objective = CastDeliveryObjective::broadcast_in_domain(
179+
node(1),
180+
domain(1),
181+
[node(2)],
182+
CastCoverageObjective::AllReceivers,
183+
);
173184
let policy = CastDeliveryPolicy {
174185
require_bidirectional: true,
175186
allow_gateway_assisted_broadcast: false,
@@ -214,8 +225,9 @@ fn broadcast_delivery_ordering_is_deterministic() {
214225
],
215226
evidence_policy(),
216227
);
217-
let objective = CastDeliveryObjective::broadcast(
228+
let objective = CastDeliveryObjective::broadcast_in_domain(
218229
node(1),
230+
domain(1),
219231
[node(2), node(3)],
220232
CastCoverageObjective::AnyReceiver,
221233
);

crates/cast-support/tests/multicast.rs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use jacquard_cast_support::{
44
shape_multicast_evidence, CastEvidenceBounds, CastEvidenceMeta, CastEvidencePolicy,
55
CastGroupId, MulticastObservation, ReceiverCoverageObservation,
66
};
7-
use jacquard_core::{ByteCount, DurationMs, NodeId, OrderStamp, RatioPermille, Tick};
7+
use jacquard_core::{
8+
ByteCount, DurationMs, MulticastGroupId, NodeId, OrderStamp, RatioPermille, Tick,
9+
};
810

911
fn node(byte: u8) -> NodeId {
1012
NodeId([byte; 32])
@@ -39,20 +41,20 @@ fn receiver(byte: u8, confidence: u16) -> ReceiverCoverageObservation {
3941
}
4042
}
4143

42-
fn group(name: &[u8]) -> CastGroupId {
43-
CastGroupId(name.to_vec())
44+
fn group(byte: u8) -> CastGroupId {
45+
CastGroupId::new(MulticastGroupId([byte; 16]))
4446
}
4547

4648
fn observation(
47-
name: &[u8],
49+
group_byte: u8,
4850
receivers: Vec<ReceiverCoverageObservation>,
4951
pressure: u16,
5052
fanout: u32,
5153
order: u64,
5254
) -> MulticastObservation {
5355
MulticastObservation {
5456
sender: node(1),
55-
group_id: group(name),
57+
group_id: group(group_byte),
5658
receivers,
5759
group_pressure_permille: RatioPermille(pressure),
5860
fanout_limit: fanout,
@@ -65,7 +67,7 @@ fn observation(
6567
fn multicast_full_and_partial_coverage_are_explicit() {
6668
let (evidence, _report) = shape_multicast_evidence(
6769
[observation(
68-
b"team",
70+
1,
6971
vec![receiver(2, 800), receiver(3, 700)],
7072
100,
7173
2,
@@ -86,26 +88,23 @@ fn multicast_full_and_partial_coverage_are_explicit() {
8688
fn multicast_group_pressure_reduces_ranking() {
8789
let (evidence, _report) = shape_multicast_evidence(
8890
[
89-
observation(b"quiet", vec![receiver(2, 800)], 100, 1, 1),
90-
observation(b"loaded", vec![receiver(3, 800)], 900, 1, 2),
91+
observation(1, vec![receiver(2, 800)], 100, 1, 1),
92+
observation(2, vec![receiver(3, 800)], 900, 1, 2),
9193
],
9294
policy(),
9395
);
9496

95-
assert_eq!(evidence[0].group_id, group(b"quiet"));
96-
assert_eq!(evidence[1].group_id, group(b"loaded"));
97+
assert_eq!(evidence[0].group_id, group(1));
98+
assert_eq!(evidence[1].group_id, group(2));
9799
}
98100

99101
#[test]
100102
fn multicast_stale_group_evidence_and_bounded_fanout_are_omitted() {
101-
let mut stale = observation(b"stale", vec![receiver(2, 800)], 100, 1, 1);
103+
let mut stale = observation(1, vec![receiver(2, 800)], 100, 1, 1);
102104
stale.meta = meta(1_001, 1);
103105

104106
let (evidence, report) = shape_multicast_evidence(
105-
[
106-
stale,
107-
observation(b"wide", vec![receiver(3, 800)], 100, 4, 2),
108-
],
107+
[stale, observation(2, vec![receiver(3, 800)], 100, 4, 2)],
109108
policy(),
110109
);
111110

@@ -117,14 +116,14 @@ fn multicast_stale_group_evidence_and_bounded_fanout_are_omitted() {
117116
#[test]
118117
fn multicast_receiver_order_is_stable_across_input_ordering() {
119118
let first = observation(
120-
b"group",
119+
1,
121120
vec![receiver(4, 700), receiver(2, 900), receiver(3, 800)],
122121
100,
123122
3,
124123
1,
125124
);
126125
let second = observation(
127-
b"group",
126+
1,
128127
vec![receiver(3, 800), receiver(2, 900), receiver(4, 700)],
129128
100,
130129
3,
@@ -149,7 +148,7 @@ fn multicast_receiver_order_is_stable_across_input_ordering() {
149148
fn multicast_low_confidence_receivers_do_not_force_all_or_nothing_delivery() {
150149
let (evidence, _report) = shape_multicast_evidence(
151150
[observation(
152-
b"mixed",
151+
1,
153152
vec![receiver(2, 800), receiver(3, 400), receiver(4, 700)],
154153
100,
155154
3,
@@ -168,3 +167,23 @@ fn multicast_low_confidence_receivers_do_not_force_all_or_nothing_delivery() {
168167
vec![node(2), node(4)]
169168
);
170169
}
170+
171+
#[test]
172+
fn route_group_identity_is_direct_and_stable() {
173+
let id = MulticastGroupId([9; 16]);
174+
175+
assert_eq!(CastGroupId::new(id).to_route_group_id(), id);
176+
}
177+
178+
#[test]
179+
fn route_group_identity_distinguishes_explicit_ids_with_shared_prefix() {
180+
let mut left = [7; 16];
181+
let mut right = [7; 16];
182+
left[15] = 1;
183+
right[15] = 2;
184+
185+
assert_ne!(
186+
CastGroupId::new(MulticastGroupId(left)).to_route_group_id(),
187+
CastGroupId::new(MulticastGroupId(right)).to_route_group_id()
188+
);
189+
}

crates/cast-support/tests/surface.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use jacquard_cast_support::{
44
CastEvidencePolicy, CastGroupId, MulticastObservation, ReceiverCoverageObservation,
55
UnicastObservation, UnicastSupportKind,
66
};
7-
use jacquard_core::{ByteCount, DurationMs, NodeId, OrderStamp, RatioPermille, Tick};
7+
use jacquard_core::{
8+
ByteCount, DurationMs, MulticastGroupId, NodeId, OrderStamp, RatioPermille, Tick,
9+
};
810

911
fn node(byte: u8) -> NodeId {
1012
NodeId([byte; 32])
@@ -79,7 +81,7 @@ fn multicast_surface_enforces_group_coverage_bounds() {
7981
let (evidence, report) = shape_multicast_evidence(
8082
[MulticastObservation {
8183
sender: node(1),
82-
group_id: CastGroupId(b"group".to_vec()),
84+
group_id: CastGroupId::new(MulticastGroupId([1; 16])),
8385
receivers,
8486
group_pressure_permille: RatioPermille(100),
8587
fanout_limit: 2,

crates/core/src/base/identity.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ super::bytes_newtype!(DiscoveryScopeId, 16);
2727
super::bytes_newtype!(HomeId, 16);
2828
super::bytes_newtype!(ClusterId, 16);
2929
super::bytes_newtype!(GatewayId, 16);
30+
super::bytes_newtype!(MulticastGroupId, 16);
31+
super::bytes_newtype!(BroadcastDomainId, 16);
3032
super::bytes_newtype!(RouteId, 16);
3133
super::bytes_newtype!(RoutingEngineContractId, 16);
3234
super::bytes_newtype!(RouteOperationId, 16);

0 commit comments

Comments
 (0)