Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,17 @@ for org in organizations:

# RIGHT: Use prefetch_related
organizations.prefetch_related('projects')

# WRONG: Use hasattr() for unions
x: str | None = "hello"
if hasattr(x, "replace"):
x = x.replace("e", "a")

# RIGHT: Use isinstance()
x: str | None = "hello"
if isinstance(x, str):
x = x.replace("e", "a")

```

### Frontend
Expand Down
10 changes: 9 additions & 1 deletion src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,15 @@ def ingest_transactions_options() -> list[click.Option]:
"topic": Topic.INGEST_SPANS,
"dlq_topic": Topic.INGEST_SPANS_DLQ,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
"click_options": [
*multiprocessing_options(default_max_batch_size=100),
click.Option(
["--flusher-processes", "flusher_processes"],
default=1,
type=int,
help="Maximum number of processes for the span flusher. Defaults to 1.",
),
],
},
"process-segments": {
"topic": Topic.BUFFERED_SEGMENTS,
Expand Down
3 changes: 3 additions & 0 deletions src/sentry/spans/consumers/process/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
num_processes: int,
input_block_size: int | None,
output_block_size: int | None,
flusher_processes: int | None = None,
produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
):
super().__init__()
Expand All @@ -48,6 +49,7 @@ def __init__(
self.input_block_size = input_block_size
self.output_block_size = output_block_size
self.num_processes = num_processes
self.flusher_processes = flusher_processes
self.produce_to_pipe = produce_to_pipe

if self.num_processes != 1:
Expand All @@ -69,6 +71,7 @@ def create_with_partitions(
flusher = self._flusher = SpanFlusher(
buffer,
next_step=committer,
max_processes=self.flusher_processes,
produce_to_pipe=self.produce_to_pipe,
)

Expand Down
174 changes: 127 additions & 47 deletions src/sentry/spans/consumers/process/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.processing.backpressure.memory import ServiceMemory
from sentry.spans.buffer import SpansBuffer
from sentry.utils import metrics
from sentry.utils.arroyo import run_with_initialized_sentry
Expand All @@ -27,7 +28,8 @@

class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
"""
A background thread that polls Redis for new segments to flush and to produce to Kafka.
A background multiprocessing manager that polls Redis for new segments to flush and to produce to Kafka.
Creates one process per shard for parallel processing.

This is a processing step to be embedded into the consumer that writes to
Redis. It takes and fowards integer messages that represent recently
Expand All @@ -42,27 +44,53 @@ def __init__(
self,
buffer: SpansBuffer,
next_step: ProcessingStrategy[FilteredPayload | int],
max_processes: int | None = None,
produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
):
self.buffer = buffer
self.next_step = next_step
self.max_processes = max_processes or len(buffer.assigned_shards)

self.mp_context = mp_context = multiprocessing.get_context("spawn")
self.stopped = mp_context.Value("i", 0)
self.redis_was_full = False
self.current_drift = mp_context.Value("i", 0)
self.backpressure_since = mp_context.Value("i", 0)
self.healthy_since = mp_context.Value("i", 0)
self.process_restarts = 0
self.produce_to_pipe = produce_to_pipe

self._create_process()

def _create_process(self):
# Determine which shards get their own processes vs shared processes
self.num_processes = min(self.max_processes, len(buffer.assigned_shards))
self.process_to_shards_map: dict[int, list[int]] = {
i: [] for i in range(self.num_processes)
}
for i, shard in enumerate(buffer.assigned_shards):
process_index = i % self.num_processes
self.process_to_shards_map[process_index].append(shard)

self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {}
self.process_healthy_since = {
process_index: mp_context.Value("i", int(time.time()))
for process_index in range(self.num_processes)
}
self.process_backpressure_since = {
process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes)
}
self.process_restarts = {process_index: 0 for process_index in range(self.num_processes)}
self.buffers: dict[int, SpansBuffer] = {}

self._create_processes()

def _create_processes(self):
# Create processes based on shard mapping
for process_index, shards in self.process_to_shards_map.items():
self._create_process_for_shards(process_index, shards)

def _create_process_for_shards(self, process_index: int, shards: list[int]):
# Optimistically reset healthy_since to avoid a race between the
# starting process and the next flush cycle. Keep back pressure across
# the restart, however.
self.healthy_since.value = int(time.time())
self.process_healthy_since[process_index].value = int(time.time())

# Create a buffer for these specific shards
shard_buffer = SpansBuffer(shards)
Comment on lines +92 to +93
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Creating multiple SpansBuffer instances for different shards may lead to resource duplication. Consider if the original buffer can be shared or if there's a more efficient way to partition the work.

Suggested change
# Create a buffer for these specific shards
shard_buffer = SpansBuffer(shards)
# Use the shared buffer for all shards
shard_buffer = self.buffer

Copilot uses AI. Check for mistakes.

make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
if self.produce_to_pipe is None:
Expand All @@ -72,37 +100,50 @@ def _create_process(self):
# pickled separately. at the same time, pickling
# synchronization primitives like multiprocessing.Value can
# only be done by the Process
self.buffer,
shard_buffer,
)
make_process = self.mp_context.Process
else:
target = partial(SpanFlusher.main, self.buffer)
target = partial(SpanFlusher.main, shard_buffer)
make_process = threading.Thread

self.process = make_process(
process = make_process(
target=target,
args=(
shards,
self.stopped,
self.current_drift,
self.backpressure_since,
self.healthy_since,
self.process_backpressure_since[process_index],
self.process_healthy_since[process_index],
self.produce_to_pipe,
),
daemon=True,
)

self.process.start()
process.start()
self.processes[process_index] = process
self.buffers[process_index] = shard_buffer

def _create_process_for_shard(self, shard: int):
# Find which process this shard belongs to and restart that process
for process_index, shards in self.process_to_shards_map.items():
if shard in shards:
self._create_process_for_shards(process_index, shards)
break
Comment on lines +127 to +132
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method _create_process_for_shard appears to be unused and may be dead code. If it's intended for future use, consider adding a comment explaining its purpose or removing it if not needed.

Suggested change
def _create_process_for_shard(self, shard: int):
# Find which process this shard belongs to and restart that process
for process_index, shards in self.process_to_shards_map.items():
if shard in shards:
self._create_process_for_shards(process_index, shards)
break
# Removed the `_create_process_for_shard` method as it is unused and redundant.

Copilot uses AI. Check for mistakes.

@staticmethod
def main(
buffer: SpansBuffer,
shards: list[int],
stopped,
current_drift,
backpressure_since,
healthy_since,
produce_to_pipe: Callable[[KafkaPayload], None] | None,
) -> None:
shard_tag = ",".join(map(str, shards))
Comment on lines +137 to +144
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The main method signature has grown complex with 7 parameters. Consider grouping related parameters into a configuration object or data class to improve maintainability and readability.

Suggested change
shards: list[int],
stopped,
current_drift,
backpressure_since,
healthy_since,
produce_to_pipe: Callable[[KafkaPayload], None] | None,
) -> None:
shard_tag = ",".join(map(str, shards))
config: "FlusherConfig",
) -> None:
shard_tag = ",".join(map(str, config.shards))

Copilot uses AI. Check for mistakes.
sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher")
sentry_sdk.set_tag("sentry_spans_buffer_shards", shard_tag)

try:
producer_futures = []
Expand Down Expand Up @@ -134,23 +175,28 @@ def produce(payload: KafkaPayload) -> None:
else:
backpressure_since.value = 0

# Update healthy_since for all shards handled by this process
healthy_since.value = system_now

if not flushed_segments:
time.sleep(1)
continue

with metrics.timer("spans.buffer.flusher.produce"):
for _, flushed_segment in flushed_segments.items():
with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
for flushed_segment in flushed_segments.values():
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underscore variable _ was removed from the tuple unpacking. This change from for _, flushed_segment in flushed_segments.items() to for flushed_segment in flushed_segments.values() is correct, but the commit should verify that the key is indeed not needed elsewhere.

Suggested change
for flushed_segment in flushed_segments.values():
for _, flushed_segment in flushed_segments.items():

Copilot uses AI. Check for mistakes.
if not flushed_segment.spans:
continue

spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(kafka_payload)

with metrics.timer("spans.buffer.flusher.wait_produce"):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent tag naming: line 185 uses "shard" while line 199 uses "shards" for the same data. This should be consistent throughout the codebase.

Suggested change
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):

Copilot uses AI. Check for mistakes.
for future in producer_futures:
future.result()

Expand All @@ -169,46 +215,71 @@ def produce(payload: KafkaPayload) -> None:
def poll(self) -> None:
self.next_step.poll()

def _ensure_process_alive(self) -> None:
def _ensure_processes_alive(self) -> None:
max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds")
if not self.process.is_alive():
exitcode = getattr(self.process, "exitcode", "unknown")
cause = f"no_process_{exitcode}"
elif int(time.time()) - self.healthy_since.value > max_unhealthy_seconds:
cause = "hang"
else:
return # healthy

metrics.incr("spans.buffer.flusher_unhealthy", tags={"cause": cause})
if self.process_restarts > MAX_PROCESS_RESTARTS:
raise RuntimeError(f"flusher process crashed repeatedly ({cause}), restarting consumer")
for process_index, process in self.processes.items():
if not process:
continue

shards = self.process_to_shards_map[process_index]

cause = None
if not process.is_alive():
exitcode = getattr(process, "exitcode", "unknown")
cause = f"no_process_{exitcode}"
elif (
int(time.time()) - self.process_healthy_since[process_index].value
> max_unhealthy_seconds
):
# Check if any shard handled by this process is unhealthy
cause = "hang"

if cause is None:
continue # healthy

# Report unhealthy for all shards handled by this process
for shard in shards:
metrics.incr(
"spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard}
)

try:
self.process.kill()
except ValueError:
pass # Process already closed, ignore
if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
raise RuntimeError(
f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
)
self.process_restarts[process_index] += 1

self.process_restarts += 1
self._create_process()
try:
if isinstance(process, multiprocessing.Process):
process.kill()
except (ValueError, AttributeError):
pass # Process already closed, ignore

self._create_process_for_shards(process_index, shards)

def submit(self, message: Message[FilteredPayload | int]) -> None:
# Note that submit is not actually a hot path. Their message payloads
# are mapped from *batches* of spans, and there are a handful of spans
# per second at most. If anything, self.poll() might even be called
# more often than submit()

self._ensure_process_alive()
self._ensure_processes_alive()

self.buffer.record_stored_segments()
for buffer in self.buffers.values():
buffer.record_stored_segments()

# We pause insertion into Redis if the flusher is not making progress
# fast enough. We could backlog into Redis, but we assume, despite best
# efforts, it is still always going to be less durable than Kafka.
# Minimizing our Redis memory usage also makes COGS easier to reason
# about.
if self.backpressure_since.value > 0:
backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
if int(time.time()) - self.backpressure_since.value > backpressure_secs:
backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
for backpressure_since in self.process_backpressure_since.values():
if (
backpressure_since.value > 0
and int(time.time()) - backpressure_since.value > backpressure_secs
):
metrics.incr("spans.buffer.flusher.backpressure")
raise MessageRejected()

Expand All @@ -225,7 +296,9 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
# wait until the situation is improved manually.
max_memory_percentage = options.get("spans.buffer.max-memory-percentage")
if max_memory_percentage < 1.0:
memory_infos = list(self.buffer.get_memory_info())
memory_infos: list[ServiceMemory] = []
for buffer in self.buffers.values():
memory_infos.extend(buffer.get_memory_info())
used = sum(x.used for x in memory_infos)
available = sum(x.available for x in memory_infos)
if available > 0 and used / available > max_memory_percentage:
Expand Down Expand Up @@ -253,15 +326,22 @@ def close(self) -> None:
self.next_step.close()

def join(self, timeout: float | None = None):
# set stopped flag first so we can "flush" the background thread while
# set stopped flag first so we can "flush" the background threads while
# next_step is also shutting down. we can do two things at once!
self.stopped.value = True
deadline = time.time() + timeout if timeout else None

self.next_step.join(timeout)

while self.process.is_alive() and (deadline is None or deadline > time.time()):
time.sleep(0.1)
# Wait for all processes to finish
for process_index, process in self.processes.items():
if deadline is not None:
remaining_time = deadline - time.time()
if remaining_time <= 0:
break

while process.is_alive() and (deadline is None or deadline > time.time()):
time.sleep(0.1)

if isinstance(self.process, multiprocessing.Process):
self.process.terminate()
if isinstance(process, multiprocessing.Process):
process.terminate()
Loading
Loading