Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions nats/src/nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Header(str, Enum):
LAST_STREAM = "Nats-Last-Stream"
MSG_ID = "Nats-Msg-Id"
MSG_TTL = "Nats-TTL"
PIN_ID = "Nats-Pin-Id"
ROLLUP = "Nats-Rollup"
STATUS = "Status"

Expand All @@ -46,6 +47,7 @@ class StatusCode(str, Enum):
NO_MESSAGES = "404"
REQUEST_TIMEOUT = "408"
CONFLICT = "409"
LOCKED = "423"
CONTROL_MESSAGE = "100"


Expand Down Expand Up @@ -491,6 +493,21 @@ class ReplayPolicy(str, Enum):
ORIGINAL = "original"


class PriorityPolicy(str, Enum):
"""Priority policy for pull consumer priority groups.

Defines how messages are delivered to groups of pull consumers.
Introduced in nats-server 2.11.0.

References:
* ADR-42: Pull Consumer Priority Groups
"""

OVERFLOW = "overflow"
PINNED = "pinned"
PRIORITIZED = "prioritized"


@dataclass
class ConsumerConfig(Base):
"""Consumer configuration.
Expand Down Expand Up @@ -543,6 +560,20 @@ class ConsumerConfig(Base):
# Introduced in nats-server 2.11.0.
pause_until: Optional[str] = None

# Priority groups configuration (ADR-42).
# Array of group identifiers. Currently limited to one group.
# Valid group names must match: (A-Z, a-z, 0-9, dash, underscore, fwd-slash, equals)+
# Groups cannot exceed 16 characters.
# Only supported for pull consumers.
# Introduced in nats-server 2.11.0.
priority_groups: Optional[List[str]] = None

# Priority policy for message delivery to consumer groups.
# Specifies delivery mechanism: overflow, pinned, or prioritized.
# Requires explicit ack policy and pull consumer.
# Introduced in nats-server 2.11.0.
priority_policy: Optional[PriorityPolicy] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "ack_wait")
Expand Down Expand Up @@ -570,6 +601,19 @@ class SequenceInfo(Base):
# last_active: Optional[datetime]


@dataclass
class PriorityGroupState(Base):
"""
PriorityGroupState tracks the state of a priority group.
Used for pinned client policy to track which client is pinned.
Introduced in nats-server 2.11.0.
"""

pinned_client_id: Optional[str] = None
# FIXME: Do not handle dates for now.
# pinned_ts: Optional[datetime]


@dataclass
class ConsumerInfo(Base):
"""
Expand All @@ -595,13 +639,26 @@ class ConsumerInfo(Base):
# RFC 3339 timestamp until which the consumer is paused.
# Introduced in nats-server 2.11.0.
pause_remaining: Optional[str] = None
# Priority group state tracking (ADR-42).
# Can be either a list (echoing config) or dict mapping group names to state.
# When dict, maps to PriorityGroupState for pinned client tracking.
# Introduced in nats-server 2.11.0.
priority_groups: Optional[Any] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "delivered", SequenceInfo)
cls._convert(resp, "ack_floor", SequenceInfo)
cls._convert(resp, "config", ConsumerConfig)
cls._convert(resp, "cluster", ClusterInfo)
# Convert priority_groups dict values to PriorityGroupState
# Note: priority_groups can be a dict (state tracking) or list (config echo)
if "priority_groups" in resp and resp["priority_groups"]:
if isinstance(resp["priority_groups"], dict):
resp["priority_groups"] = {
k: PriorityGroupState.from_response(v) for k, v in resp["priority_groups"].items()
}
# If it's a list, leave it as-is (it's from config, not state)
return super().from_response(resp)


Expand Down
51 changes: 49 additions & 2 deletions nats/src/nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,13 +1055,25 @@ async def fetch(
batch: int = 1,
timeout: Optional[float] = 5,
heartbeat: Optional[float] = None,
group: Optional[str] = None,
min_pending: Optional[int] = None,
min_ack_pending: Optional[int] = None,
failover: Optional[int] = None,
priority: Optional[int] = None,
pin_id: Optional[str] = None,
) -> List[Msg]:
"""
fetch makes a request to JetStream to be delivered a set of messages.

:param batch: Number of messages to fetch from server.
:param timeout: Max duration of the fetch request before it expires.
:param heartbeat: Idle Heartbeat interval in seconds for the fetch request.
:param group: Priority group identifier (ADR-42). Required for priority group consumers.
:param min_pending: Minimum number of pending messages for overflow policy (ADR-42).
:param min_ack_pending: Minimum number of ack pending messages for overflow policy (ADR-42).
:param failover: Failover timeout in seconds for overflow policy (ADR-42). Range: 5-3600.
:param priority: Priority level for prioritized policy (ADR-42). Range: 0-9, lower is higher priority.
:param pin_id: Client pin ID for pinned policy (ADR-42). Server-assigned via Nats-Pin-Id header.

::

Expand Down Expand Up @@ -1093,18 +1105,33 @@ async def main():
if timeout is not None and timeout <= 0:
raise ValueError("nats: invalid fetch timeout")

# Validate priority group parameters
if failover is not None and (failover < 5 or failover > 3600):
raise ValueError("nats: failover must be between 5 and 3600 seconds")
if priority is not None and (priority < 0 or priority > 9):
raise ValueError("nats: priority must be between 0 and 9")

expires = int(timeout * 1_000_000_000) - 100_000 if timeout else None
priority_params = {
"group": group,
"min_pending": min_pending,
"min_ack_pending": min_ack_pending,
"failover": failover,
"priority": priority,
"id": pin_id,
}
if batch == 1:
msg = await self._fetch_one(expires, timeout, heartbeat)
msg = await self._fetch_one(expires, timeout, heartbeat, priority_params)
return [msg]
msgs = await self._fetch_n(batch, expires, timeout, heartbeat)
msgs = await self._fetch_n(batch, expires, timeout, heartbeat, priority_params)
return msgs

async def _fetch_one(
self,
expires: Optional[int],
timeout: Optional[float],
heartbeat: Optional[float] = None,
priority_params: Optional[Dict[str, Any]] = None,
) -> Msg:
queue = self._sub._pending_queue

Expand All @@ -1131,6 +1158,12 @@ async def _fetch_one(
if heartbeat:
next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds

# Add priority group parameters (ADR-42)
if priority_params:
for key, value in priority_params.items():
if value is not None:
next_req[key] = value

await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
Expand Down Expand Up @@ -1177,6 +1210,7 @@ async def _fetch_n(
expires: Optional[int],
timeout: Optional[float],
heartbeat: Optional[float] = None,
priority_params: Optional[Dict[str, Any]] = None,
) -> List[Msg]:
msgs = []
queue = self._sub._pending_queue
Expand Down Expand Up @@ -1210,6 +1244,13 @@ async def _fetch_n(
if heartbeat:
next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds
next_req["no_wait"] = True

# Add priority group parameters (ADR-42)
if priority_params:
for key, value in priority_params.items():
if value is not None:
next_req[key] = value

await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
Expand Down Expand Up @@ -1272,6 +1313,12 @@ async def _fetch_n(
if heartbeat:
next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds

# Add priority group parameters (ADR-42)
if priority_params:
for key, value in priority_params.items():
if value is not None:
next_req[key] = value

await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
Expand Down
67 changes: 67 additions & 0 deletions nats/src/nats/js/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,38 @@ async def add_consumer(
if config is None:
config = api.ConsumerConfig()
config = config.evolve(**params)

# Validate priority groups configuration (ADR-42)
if config.priority_groups or config.priority_policy:
# Both must be set together
if not (config.priority_groups and config.priority_policy):
raise ValueError("nats: priority_groups and priority_policy must both be set")

# Only pull consumers supported
if config.deliver_subject:
raise ValueError("nats: priority groups only supported for pull consumers")

# Requires explicit ack policy
if config.ack_policy != api.AckPolicy.EXPLICIT:
raise ValueError("nats: priority groups require explicit ack policy")

# Validate group names
import re

valid_group_pattern = re.compile(r"^[A-Za-z0-9\-_/=]+$")
for group in config.priority_groups:
if len(group) > 16:
raise ValueError(f"nats: priority group name '{group}' exceeds 16 characters")
if not valid_group_pattern.match(group):
raise ValueError(
f"nats: priority group name '{group}' is invalid. "
"Must contain only A-Z, a-z, 0-9, dash, underscore, forward slash, or equals"
)

# Currently limited to one group
if len(config.priority_groups) > 1:
raise ValueError("nats: only one priority group is currently supported")

durable_name = config.durable_name
req = {"stream_name": stream, "config": config.as_dict()}
req_data = json.dumps(req).encode()
Expand Down Expand Up @@ -296,6 +328,41 @@ async def resume_consumer(
# Resume by pausing until a time in the past (epoch)
return await self.pause_consumer(stream, consumer, "1970-01-01T00:00:00Z", timeout)

async def unpin_consumer(
self,
stream: str,
consumer: str,
timeout: Optional[float] = None,
) -> Dict[str, Any]:
"""
Unpin a consumer using the pinned client policy (ADR-42).

This forces the server to select a new pinned client for the consumer.
The current pinned client will receive a 423 status code on its next
fetch request.

Args:
stream: The stream name
consumer: The consumer name
timeout: Request timeout in seconds

Returns:
Response from the unpin operation

Note:
Requires nats-server 2.11.0 or later
Only applies to consumers with pinned priority policy
"""
if timeout is None:
timeout = self._timeout

resp = await self._api_request(
f"{self._prefix}.CONSUMER.UNPIN.{stream}.{consumer}",
b"",
timeout=timeout,
)
return resp

async def consumers_info(self, stream: str, offset: Optional[int] = None) -> List[api.ConsumerInfo]:
"""
consumers_info retrieves a list of consumers. Consumers list limit is 256 for more
Expand Down
Loading