Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sentry-repo
Submodule sentry-repo added at a5d290
186 changes: 171 additions & 15 deletions src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import logging
import pickle
import threading
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date, datetime, timezone
from enum import Enum
from time import time
Expand All @@ -27,12 +27,13 @@
validate_dynamic_cluster,
)

_local_buffers = None
_local_buffers_lock = threading.Lock()

logger = logging.getLogger(__name__)

T = TypeVar("T", str, bytes)


def _get_model_key(model: type[models.Model]) -> str:
return str(model._meta)
# Debounce our JSON validation a bit in order to not cause too much additional
# load everywhere
_last_validation_log: float | None = None
Expand Down Expand Up @@ -81,6 +82,103 @@ def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
redis_buffer_registry = BufferHookRegistry()


# Callable to get the queue name for the given model_key.
# May return None to not assign a queue for the given model_key.
ChooseQueueFunction = Callable[[str], str | None]


@dataclass
class PendingBufferValue:
model_key: str | None
pending_buffer: "PendingBuffer"
generate_queue: ChooseQueueFunction | None


class PendingBufferRouter:
def __init__(self, incr_batch_size: int) -> None:
self.incr_batch_size = incr_batch_size
self.default_pending_buffer = PendingBuffer(self.incr_batch_size)
# map of model_key to PendingBufferValue
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an extra trailing space at the end of the comment line.

Suggested change
# map of model_key to PendingBufferValue
# map of model_key to PendingBufferValue

Copilot uses AI. Check for mistakes.
self.pending_buffer_router: dict[str, PendingBufferValue] = dict()
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Use {} instead of dict() for better performance and readability when creating empty dictionaries.

Suggested change
self.pending_buffer_router: dict[str, PendingBufferValue] = dict()
self.pending_buffer_router: dict[str, PendingBufferValue] = {}

Copilot uses AI. Check for mistakes.

def create_pending_buffer(self, model_key: str, generate_queue: ChooseQueueFunction) -> None:
"""
Create a PendingBuffer for the given model_key and queue name.
We assume that there will be a dedicated queue for the given model associated with the model_key.
"""
pending_buffer = PendingBuffer(self.incr_batch_size)
self.pending_buffer_router[model_key] = PendingBufferValue(
model_key=model_key, pending_buffer=pending_buffer, generate_queue=generate_queue
)

def get_pending_buffer(self, model_key: str | None) -> "PendingBuffer":
"""
Get the pending buffer assigned to the given model_key.
"""
if model_key is not None and model_key in self.pending_buffer_router:
return self.pending_buffer_router[model_key].pending_buffer
return self.default_pending_buffer

def queue(self, model_key: str) -> str | None:
"""
Get the queue name for the given model_key.
"""
if model_key in self.pending_buffer_router:
generate_queue = self.pending_buffer_router[model_key].generate_queue
if generate_queue is not None:
return generate_queue(model_key)
return None

def pending_buffers(self) -> list[PendingBufferValue]:
pending_buffers = list(self.pending_buffer_router.values())
pending_buffers.append(
PendingBufferValue(
model_key=None, pending_buffer=self.default_pending_buffer, generate_queue=None
)
)
return pending_buffers


class RedisBufferRouter:
def __init__(self) -> None:
# map of model_key (generated from _get_model_key function) to queue name
self._routers: dict[str, ChooseQueueFunction] = dict()
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Use {} instead of dict() for better performance and readability when creating empty dictionaries.

Suggested change
self._routers: dict[str, ChooseQueueFunction] = dict()
self._routers: dict[str, ChooseQueueFunction] = {}

Copilot uses AI. Check for mistakes.

def assign_queue(self, model: type[models.Model], generate_queue: ChooseQueueFunction) -> None:
"""
RedisBuffer is shared among multiple models.
Thus, the process_incr task and the default assigned queue for it is shared among multiple models.
If any backlogs or slowdowns occur when incrementing counts for any specific model, other models will be affected.

To alleviate this, we can assign a dedicated queue for any given model.
If a dedicated queue is assigned, the process_incr task will be processed in the assigned queue.
On the other hand, if no dedicated queue is assigned, the process_incr task will be processed in
the default queue (i.e. counters-0 queue).

A queue can be assigned to a model by passing in the generate_queue function.
"""
key = _get_model_key(model=model)
self._routers[key] = generate_queue

def create_pending_buffers_router(self, incr_batch_size: int) -> PendingBufferRouter:
"""
We create a PendingBuffer (with buffer size incr_batch_size) for each model with an assigned queue.
In addition, we create a default PendingBuffer (with buffer size incr_batch_size) for models without an
assigned queue. The default PendingBuffer is implicitly assigned to the default queue of the process_incr task.

These PendingBuffers are wrapped in a PendingBufferRouter.
"""
pending_buffers_router = PendingBufferRouter(incr_batch_size=incr_batch_size)
for model_key, generate_queue in self._routers.items():
pending_buffers_router.create_pending_buffer(
model_key=model_key, generate_queue=generate_queue
)
return pending_buffers_router


redis_buffer_router = RedisBufferRouter()


# Note HMSET is not supported after redis 4.0.0, after updating we can use HSET directly.
class RedisOperation(Enum):
SORTED_SET_ADD = "zadd"
Expand Down Expand Up @@ -146,7 +244,22 @@ def _make_key(self, model: type[models.Model], filters: dict[str, Any]) -> str:
md5 = md5_text(
"&".join(f"{k}={self._coerce_val(v)!r}" for k, v in sorted(filters.items()))
).hexdigest()
return f"b:k:{model._meta}:{md5}"
model_key = _get_model_key(model=model)
return f"b:k:{model_key}:{md5}"

def _extract_model_from_key(self, key: str) -> str | None:
"""
Extracts the model metadata from a Redis key.
"""
try:
parts = key.split(":")

if len(parts) != 4 or parts[0] != "b" or parts[1] != "k":
return None

return parts[2]
except Exception:
return None

def _make_lock_key(self, key: str) -> str:
return f"l:{key}"
Expand Down Expand Up @@ -398,20 +511,48 @@ def process_pending(self) -> None:
if not lock_key:
return

pending_buffer = PendingBuffer(self.incr_batch_size)
pending_buffers_router = redis_buffer_router.create_pending_buffers_router(
incr_batch_size=self.incr_batch_size
)

def _generate_process_incr_kwargs(model_key: str | None) -> dict[str, Any]:
# The queue to be used for the process_incr task is determined in the following order of precedence:
# 1. The queue argument passed to process_incr.apply_async()
# 2. The queue defined on the process_incr task
# 3. Any defined routes in CELERY_ROUTES
#
# See: https://docs.celeryq.dev/en/latest/userguide/routing.html#specifying-task-destination
#
# Hence, we override the default queue of the process_incr task by passing in the assigned queue for the
# model associated with the model_key.
process_incr_kwargs: dict[str, Any] = dict()
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Use {} instead of dict() for better performance and readability when creating empty dictionaries.

Suggested change
process_incr_kwargs: dict[str, Any] = dict()
process_incr_kwargs: dict[str, Any] = {}

Copilot uses AI. Check for mistakes.
if model_key is None:
metrics.incr("buffer.process-incr.model-key-missing")
return process_incr_kwargs
queue = pending_buffers_router.queue(model_key=model_key)
if queue is not None:
process_incr_kwargs["queue"] = queue
metrics.incr("buffer.process-incr-queue", tags={"queue": queue})
else:
metrics.incr("buffer.process-incr-default-queue")
return process_incr_kwargs

try:
keycount = 0
if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
keys = self.cluster.zrange(self.pending_key, 0, -1)
keys: list[str] = self.cluster.zrange(self.pending_key, 0, -1)
keycount += len(keys)

for key in keys:
pending_buffer.append(key)
model_key = self._extract_model_from_key(key=key)
pending_buffer = pending_buffers_router.get_pending_buffer(model_key=model_key)
pending_buffer.append(item=key)
if pending_buffer.full():
process_incr_kwargs = _generate_process_incr_kwargs(model_key=model_key)
process_incr.apply_async(
kwargs={"batch_keys": pending_buffer.flush()},
headers={"sentry-propagate-traces": False},
**process_incr_kwargs,
)

self.cluster.zrem(self.pending_key, *keys)
Expand All @@ -425,22 +566,37 @@ def process_pending(self) -> None:
continue
keycount += len(keysb)
for keyb in keysb:
pending_buffer.append(keyb.decode("utf-8"))
key = keyb.decode("utf-8")
model_key = self._extract_model_from_key(key=key)
pending_buffer = pending_buffers_router.get_pending_buffer(
model_key=model_key
)
pending_buffer.append(item=key)
if pending_buffer.full():
process_incr_kwargs = _generate_process_incr_kwargs(
model_key=model_key
)
process_incr.apply_async(
kwargs={"batch_keys": pending_buffer.flush()},
headers={"sentry-propagate-traces": False},
**process_incr_kwargs,
)
conn.target([host_id]).zrem(self.pending_key, *keysb)
else:
raise AssertionError("unreachable")

# queue up remainder of pending keys
if not pending_buffer.empty():
process_incr.apply_async(
kwargs={"batch_keys": pending_buffer.flush()},
headers={"sentry-propagate-traces": False},
)
# process any non-empty pending buffers
for pending_buffer_value in pending_buffers_router.pending_buffers():
pending_buffer = pending_buffer_value.pending_buffer
model_key = pending_buffer_value.model_key

if not pending_buffer.empty():
process_incr_kwargs = _generate_process_incr_kwargs(model_key=model_key)
process_incr.apply_async(
kwargs={"batch_keys": pending_buffer.flush()},
headers={"sentry-propagate-traces": False},
**process_incr_kwargs,
)

metrics.distribution("buffer.pending-size", keycount)
finally:
Expand Down
8 changes: 7 additions & 1 deletion tests/sentry/buffer/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
from django.utils import timezone

from sentry import options
from sentry.buffer.redis import BufferHookEvent, RedisBuffer, redis_buffer_registry
from sentry.buffer.redis import (
BufferHookEvent,
RedisBuffer,
_get_model_key,
redis_buffer_registry,
redis_buffer_router,
)
from sentry.models.group import Group
from sentry.models.project import Project
from sentry.rules.processing.delayed_processing import process_delayed_alert_conditions
Expand Down
Loading