From 8aa7ce2af48399ea0dd73c149fcbf14f9e587d7a Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Tue, 11 Nov 2025 09:10:49 +0100 Subject: [PATCH] wip Signed-off-by: Casper Beyer --- nats/src/nats/js/api.py | 57 +++++++++++++++++++++++++++++++ nats/src/nats/js/client.py | 51 ++++++++++++++++++++++++++-- nats/src/nats/js/manager.py | 67 +++++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 2 deletions(-) diff --git a/nats/src/nats/js/api.py b/nats/src/nats/js/api.py index cdd254dbd..b9f208464 100644 --- a/nats/src/nats/js/api.py +++ b/nats/src/nats/js/api.py @@ -33,6 +33,7 @@ class Header(str, Enum): LAST_STREAM = "Nats-Last-Stream" MSG_ID = "Nats-Msg-Id" MSG_TTL = "Nats-TTL" + PIN_ID = "Nats-Pin-Id" ROLLUP = "Nats-Rollup" STATUS = "Status" @@ -46,6 +47,7 @@ class StatusCode(str, Enum): NO_MESSAGES = "404" REQUEST_TIMEOUT = "408" CONFLICT = "409" + LOCKED = "423" CONTROL_MESSAGE = "100" @@ -491,6 +493,21 @@ class ReplayPolicy(str, Enum): ORIGINAL = "original" +class PriorityPolicy(str, Enum): + """Priority policy for pull consumer priority groups. + + Defines how messages are delivered to groups of pull consumers. + Introduced in nats-server 2.11.0. + + References: + * ADR-42: Pull Consumer Priority Groups + """ + + OVERFLOW = "overflow" + PINNED = "pinned" + PRIORITIZED = "prioritized" + + @dataclass class ConsumerConfig(Base): """Consumer configuration. @@ -543,6 +560,20 @@ class ConsumerConfig(Base): # Introduced in nats-server 2.11.0. pause_until: Optional[str] = None + # Priority groups configuration (ADR-42). + # Array of group identifiers. Currently limited to one group. + # Valid group names must match: (A-Z, a-z, 0-9, dash, underscore, fwd-slash, equals)+ + # Groups cannot exceed 16 characters. + # Only supported for pull consumers. + # Introduced in nats-server 2.11.0. + priority_groups: Optional[List[str]] = None + + # Priority policy for message delivery to consumer groups. + # Specifies delivery mechanism: overflow, pinned, or prioritized. + # Requires explicit ack policy and pull consumer. + # Introduced in nats-server 2.11.0. + priority_policy: Optional[PriorityPolicy] = None + @classmethod def from_response(cls, resp: Dict[str, Any]): cls._convert_nanoseconds(resp, "ack_wait") @@ -570,6 +601,19 @@ class SequenceInfo(Base): # last_active: Optional[datetime] +@dataclass +class PriorityGroupState(Base): + """ + PriorityGroupState tracks the state of a priority group. + Used for pinned client policy to track which client is pinned. + Introduced in nats-server 2.11.0. + """ + + pinned_client_id: Optional[str] = None + # FIXME: Do not handle dates for now. + # pinned_ts: Optional[datetime] + + @dataclass class ConsumerInfo(Base): """ @@ -595,6 +639,11 @@ class ConsumerInfo(Base): # RFC 3339 timestamp until which the consumer is paused. # Introduced in nats-server 2.11.0. pause_remaining: Optional[str] = None + # Priority group state tracking (ADR-42). + # Can be either a list (echoing config) or dict mapping group names to state. + # When dict, maps to PriorityGroupState for pinned client tracking. + # Introduced in nats-server 2.11.0. + priority_groups: Optional[Any] = None @classmethod def from_response(cls, resp: Dict[str, Any]): @@ -602,6 +651,14 @@ def from_response(cls, resp: Dict[str, Any]): cls._convert(resp, "ack_floor", SequenceInfo) cls._convert(resp, "config", ConsumerConfig) cls._convert(resp, "cluster", ClusterInfo) + # Convert priority_groups dict values to PriorityGroupState + # Note: priority_groups can be a dict (state tracking) or list (config echo) + if "priority_groups" in resp and resp["priority_groups"]: + if isinstance(resp["priority_groups"], dict): + resp["priority_groups"] = { + k: PriorityGroupState.from_response(v) for k, v in resp["priority_groups"].items() + } + # If it's a list, leave it as-is (it's from config, not state) return super().from_response(resp) diff --git a/nats/src/nats/js/client.py b/nats/src/nats/js/client.py index 0cfa433b0..efe1948ae 100644 --- a/nats/src/nats/js/client.py +++ b/nats/src/nats/js/client.py @@ -1055,6 +1055,12 @@ async def fetch( batch: int = 1, timeout: Optional[float] = 5, heartbeat: Optional[float] = None, + group: Optional[str] = None, + min_pending: Optional[int] = None, + min_ack_pending: Optional[int] = None, + failover: Optional[int] = None, + priority: Optional[int] = None, + pin_id: Optional[str] = None, ) -> List[Msg]: """ fetch makes a request to JetStream to be delivered a set of messages. @@ -1062,6 +1068,12 @@ async def fetch( :param batch: Number of messages to fetch from server. :param timeout: Max duration of the fetch request before it expires. :param heartbeat: Idle Heartbeat interval in seconds for the fetch request. + :param group: Priority group identifier (ADR-42). Required for priority group consumers. + :param min_pending: Minimum number of pending messages for overflow policy (ADR-42). + :param min_ack_pending: Minimum number of ack pending messages for overflow policy (ADR-42). + :param failover: Failover timeout in seconds for overflow policy (ADR-42). Range: 5-3600. + :param priority: Priority level for prioritized policy (ADR-42). Range: 0-9, lower is higher priority. + :param pin_id: Client pin ID for pinned policy (ADR-42). Server-assigned via Nats-Pin-Id header. :: @@ -1093,11 +1105,25 @@ async def main(): if timeout is not None and timeout <= 0: raise ValueError("nats: invalid fetch timeout") + # Validate priority group parameters + if failover is not None and (failover < 5 or failover > 3600): + raise ValueError("nats: failover must be between 5 and 3600 seconds") + if priority is not None and (priority < 0 or priority > 9): + raise ValueError("nats: priority must be between 0 and 9") + expires = int(timeout * 1_000_000_000) - 100_000 if timeout else None + priority_params = { + "group": group, + "min_pending": min_pending, + "min_ack_pending": min_ack_pending, + "failover": failover, + "priority": priority, + "id": pin_id, + } if batch == 1: - msg = await self._fetch_one(expires, timeout, heartbeat) + msg = await self._fetch_one(expires, timeout, heartbeat, priority_params) return [msg] - msgs = await self._fetch_n(batch, expires, timeout, heartbeat) + msgs = await self._fetch_n(batch, expires, timeout, heartbeat, priority_params) return msgs async def _fetch_one( @@ -1105,6 +1131,7 @@ async def _fetch_one( expires: Optional[int], timeout: Optional[float], heartbeat: Optional[float] = None, + priority_params: Optional[Dict[str, Any]] = None, ) -> Msg: queue = self._sub._pending_queue @@ -1131,6 +1158,12 @@ async def _fetch_one( if heartbeat: next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds + # Add priority group parameters (ADR-42) + if priority_params: + for key, value in priority_params.items(): + if value is not None: + next_req[key] = value + await self._nc.publish( self._nms, json.dumps(next_req).encode(), @@ -1177,6 +1210,7 @@ async def _fetch_n( expires: Optional[int], timeout: Optional[float], heartbeat: Optional[float] = None, + priority_params: Optional[Dict[str, Any]] = None, ) -> List[Msg]: msgs = [] queue = self._sub._pending_queue @@ -1210,6 +1244,13 @@ async def _fetch_n( if heartbeat: next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds next_req["no_wait"] = True + + # Add priority group parameters (ADR-42) + if priority_params: + for key, value in priority_params.items(): + if value is not None: + next_req[key] = value + await self._nc.publish( self._nms, json.dumps(next_req).encode(), @@ -1272,6 +1313,12 @@ async def _fetch_n( if heartbeat: next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds + # Add priority group parameters (ADR-42) + if priority_params: + for key, value in priority_params.items(): + if value is not None: + next_req[key] = value + await self._nc.publish( self._nms, json.dumps(next_req).encode(), diff --git a/nats/src/nats/js/manager.py b/nats/src/nats/js/manager.py index 59f6e5f86..c60335fa1 100644 --- a/nats/src/nats/js/manager.py +++ b/nats/src/nats/js/manager.py @@ -203,6 +203,38 @@ async def add_consumer( if config is None: config = api.ConsumerConfig() config = config.evolve(**params) + + # Validate priority groups configuration (ADR-42) + if config.priority_groups or config.priority_policy: + # Both must be set together + if not (config.priority_groups and config.priority_policy): + raise ValueError("nats: priority_groups and priority_policy must both be set") + + # Only pull consumers supported + if config.deliver_subject: + raise ValueError("nats: priority groups only supported for pull consumers") + + # Requires explicit ack policy + if config.ack_policy != api.AckPolicy.EXPLICIT: + raise ValueError("nats: priority groups require explicit ack policy") + + # Validate group names + import re + + valid_group_pattern = re.compile(r"^[A-Za-z0-9\-_/=]+$") + for group in config.priority_groups: + if len(group) > 16: + raise ValueError(f"nats: priority group name '{group}' exceeds 16 characters") + if not valid_group_pattern.match(group): + raise ValueError( + f"nats: priority group name '{group}' is invalid. " + "Must contain only A-Z, a-z, 0-9, dash, underscore, forward slash, or equals" + ) + + # Currently limited to one group + if len(config.priority_groups) > 1: + raise ValueError("nats: only one priority group is currently supported") + durable_name = config.durable_name req = {"stream_name": stream, "config": config.as_dict()} req_data = json.dumps(req).encode() @@ -296,6 +328,41 @@ async def resume_consumer( # Resume by pausing until a time in the past (epoch) return await self.pause_consumer(stream, consumer, "1970-01-01T00:00:00Z", timeout) + async def unpin_consumer( + self, + stream: str, + consumer: str, + timeout: Optional[float] = None, + ) -> Dict[str, Any]: + """ + Unpin a consumer using the pinned client policy (ADR-42). + + This forces the server to select a new pinned client for the consumer. + The current pinned client will receive a 423 status code on its next + fetch request. + + Args: + stream: The stream name + consumer: The consumer name + timeout: Request timeout in seconds + + Returns: + Response from the unpin operation + + Note: + Requires nats-server 2.11.0 or later + Only applies to consumers with pinned priority policy + """ + if timeout is None: + timeout = self._timeout + + resp = await self._api_request( + f"{self._prefix}.CONSUMER.UNPIN.{stream}.{consumer}", + b"", + timeout=timeout, + ) + return resp + async def consumers_info(self, stream: str, offset: Optional[int] = None) -> List[api.ConsumerInfo]: """ consumers_info retrieves a list of consumers. Consumers list limit is 256 for more