Skip to content

Conversation

@carlhoerberg
Copy link
Member

Summary

  • Implements Kafka protocol support for producers, mapping Kafka topics 1:1 to LavinMQ stream queues
  • Follows the existing MQTT implementation pattern
  • Adds new src/lavinmq/kafka/ module with protocol handling

Features

  • ApiVersions API - Clients discover supported API versions
  • Metadata API - Clients query broker info and topic (stream queue) metadata
  • Produce API - Clients send messages that are stored in stream queues
  • Auto-create topics - Stream queues are auto-created when producing to non-existent topics
  • Single partition - Topics have partition 0 only (simplifies implementation)
  • Topic = Stream Queue - Direct 1:1 mapping by name

Configuration

[kafka]
bind = 127.0.0.1
port = 9092              # Enabled by default
tls_port = -1
unix_path =
default_vhost = /
auto_create_topics = true

Limitations (by design for initial implementation)

  • Producer only (no consumer support yet)
  • No authentication (SASL PLAIN can be added later)
  • Single partition only
  • No compression support in record batches
  • No transactional/idempotent producer support

Test plan

  • All existing tests pass (1184 examples, 0 failures)
  • New protocol tests for ApiVersions, Metadata, and Produce
  • Manual testing with Python Kafka client

🤖 Generated with Claude Code

Implements Kafka protocol support for producers, mapping Kafka topics 1:1
to LavinMQ stream queues. This follows the existing MQTT implementation
pattern.

Features:
- ApiVersions API - clients discover supported API versions
- Metadata API - clients query broker info and topic metadata
- Produce API - clients send messages stored in stream queues
- Auto-create topics as stream queues on first publish
- Single partition (0) per topic for simplicity
- Port 9092 enabled by default

New files:
- src/lavinmq/kafka/ - Kafka protocol module
  - consts.cr - API keys, error codes
  - protocol.cr - Wire protocol encoding/decoding
  - record_batch.cr - RecordBatch parsing
  - broker.cr - Per-vhost topic management
  - brokers.cr - VHost-aware broker registry
  - connection_factory.cr - Connection handling
  - client.cr - Request handling with read_loop
- spec/kafka/ - Protocol tests

Configuration:
  [kafka]
  port = 9092
  bind = 127.0.0.1
  auto_create_topics = true
  default_vhost = /

Limitations (by design):
- Producer only (no consumer support yet)
- No authentication (SASL can be added later)
- No compression support
- Single partition only

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
@claude
Copy link

claude bot commented Nov 27, 2025

Claude finished @carlhoerberg's task —— View job


Code Review - Issues Found

I've reviewed the Kafka producer protocol implementation and found several critical issues that need fixing:

Bugs

1. Thread Safety Issue in RecordBatch Parsing - src/lavinmq/kafka/record_batch.cr:12

@@headers_reuse_buffer = Hash(String, Bytes).new

This class variable is shared across all parsing calls and lacks synchronization. The code even has a comment acknowledging it's "not thread-safe for concurrent parsing" but doesn't address the issue. In a high-performance message broker, this could cause data corruption when multiple connections parse records simultaneously.

2. Missing CRC Validation - src/lavinmq/kafka/record_batch.cr:134,239
The code reads the CRC field but never validates it against the batch data. This is a serious protocol compliance issue that could allow corrupted data to be processed silently:

crc = io.read_bytes(UInt32, ::IO::ByteFormat::BigEndian)
# CRC is read but never validated

3. Request Size Validation Logic Error - src/lavinmq/kafka/protocol.cr:25-27

if slice.size > @bytes_remaining
  raise MaxRequestSizeError.new("Attempt to read beyond request size")
end

This check occurs in the read() method but doesn't handle partial reads correctly. If io.read() returns fewer bytes than requested, @bytes_remaining becomes incorrectly negative.

4. Resource Leak in Error Conditions - src/lavinmq/kafka/record_batch.cr:277-301
Decompression creates IO objects that may not be properly closed in error scenarios:

decompressed = ::IO::Memory.new
Compress::Gzip::Reader.open(::IO::Memory.new(data)) do |gzip|
  ::IO.copy(gzip, decompressed)
end

Performance Issues

5. Inefficient String Conversion - src/lavinmq/kafka/client.cr:385,393

@kafka_headers_hash[key] = String.new(value)  # Creates string copy
@kafka_headers_hash["kafka-key"] = String.new(key) # Another copy

These String.new(Bytes) calls create unnecessary memory copies in the hot path for every message.

6. Reused Hash Not Pre-sized - src/lavinmq/kafka/record_batch.cr:12
The reused headers buffer should have initial_capacity set to avoid rehashing for typical header counts.

Security Issues

7. DoS via Compression Bomb - src/lavinmq/kafka/record_batch.cr:277-301
No limits on decompressed data size. Malicious clients could send small compressed payloads that expand to gigabytes, consuming all server memory.

8. Memory Exhaustion via Large Arrays - src/lavinmq/kafka/protocol.cr:142-144

Array.new(length) { yield }

No bounds checking on array lengths read from network. Malicious clients could request arrays with length = 2**31 - 1.

Crystal Anti-patterns

9. Class Variable for Instance State - src/lavinmq/kafka/record_batch.cr:12
Using @@ for what should be instance or fiber-local state violates Crystal conventions and creates the thread safety issue.

10. Missing Error Types - src/lavinmq/kafka/record_batch.cr:297
Generic Error.new() instead of specific exception types makes error handling difficult for callers.


carlhoerberg and others added 15 commits December 1, 2025 22:33
- Prefix unused variables with underscore
- Rename predicate methods following Crystal conventions (is_* → *)
- Use getter? for boolean properties
- Simplify type filtering with .select(Type)
- Remove redundant return statements
- Apply code formatting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
as craft is required for specs
Drop the 'is_' prefix from the boolean property since it already has
the '?' suffix, following Crystal naming conventions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Add size validation to prevent memory exhaustion attacks:

Security improvements:
- Validate request size before reading (reject negative/oversized requests)
- Add MAX_REQUEST_SIZE limit (1 MiB)
- Track remaining bytes per request using UInt32 (prevents negative values)
- Account for all bytes consumed during parsing (primitives + fields)
- Automatic overflow detection: any underflow raises OverflowError, caught and reraised as MaxRequestSizeError

This prevents attacks where malicious clients send:
- Negative size values
- Extremely large size values causing OOM
- Cumulative field sizes exceeding request boundaries

Error hierarchy:
- Protocol::Error < IO::Error (base for all protocol errors)
- Protocol::MaxRequestSizeError < Protocol::Error (size limit violations)

The UInt32 @bytes_remaining counter provides automatic protocol violation
detection - any underflow attempt raises OverflowError, immediately
terminating the connection with a proper error message.

Test coverage:
- Spec for field size exceeding advertised request size (overflow detection)
- Spec for request size exceeding MAX_REQUEST_SIZE limit
Both specs verify connection is closed on violation.

Implementation inspired by amq-protocol.cr's Stream class approach
to frame size accounting and validation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Responses now self-serialize directly to the wire without intermediate
IO::Memory allocation. Each Response struct implements bytesize and to_io
methods, calculating exact packet size upfront and writing in a single pass.

Changes:
- Add SerializationHelpers module with size calculation and write methods
- Implement bytesize and to_io in all Response types
- Move api_version into constructors of nested structs for cleaner API
- Simplify write_response to single method call
- Reduce memory allocation and improve serialization performance

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Implements decompression for compressed Kafka record batches, enabling
the server to handle Gzip and LZ4 compressed messages from producers.

Changes:
- Add decompress method supporting Gzip and LZ4 compression types
- Replace skipped compressed batches with proper decompression logic
- Calculate compressed data size and decompress before parsing records
- Add comprehensive tests for uncompressed, gzip, and lz4 batches
- Raise clear errors for unsupported compression (Snappy, Zstd)

Uses Crystal's built-in Compress::Gzip and the existing lz4 shard
dependency. All existing tests pass with new compression functionality.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Parse RecordBatch directly from IO stream using IO::Sized wrapper instead
of allocating intermediate Bytes arrays. This eliminates memory copies and
reduces allocations during request parsing.

- Change RecordBatch.parse to accept IO and length parameters
- Use IO::Sized to safely limit reading to record set boundaries
- Replace Bytes allocation + String.new with io.read_string for header keys
- Parse batches during Protocol.read_request, storing in PartitionData

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Make Protocol inherit from IO and override read() to handle @bytes_remaining
accounting automatically. This simplifies the codebase by eliminating manual
byte tracking from individual read methods and allows Protocol to be passed
directly to methods expecting an IO interface.

Changes:
- Protocol now inherits from IO with read/write/close implementations
- Byte tracking centralized in read() method with bounds checking
- RecordBatch.parse now receives Protocol instance instead of raw IO
- Standardized on NetworkEndian (equivalent to BigEndian, more semantic)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Fix ProduceRequest v3+ parsing to read transactional_id field
- Add ApiVersions v3+ flexible version support with compact arrays and tagged fields
- Implement unsigned varint encoding/decoding for flexible protocol versions
- Increase MAX_REQUEST_SIZE from 1 MiB to 128 MiB for larger batches
- Add response version capping to prevent unsupported version responses
- Fix MetadataResponse cluster_id field to be v2+ (was incorrectly v1+)
- Add Kafka::Protocol::Error exception handling in client read loop
- Change Metadata handler to auto-create streams instead of just getting them

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Remove 10 unused methods that were identified by Crystal's unreachable code
analysis:
- read_int8, read_int64: unused primitive readers
- read_bytes: unused bytes reader
- read_unsigned_varint, read_compact_string, read_compact_nullable_string,
  skip_tagged_fields: flexible protocol helpers not yet needed
- write_string, write_nullable_string, write_array: unused writer wrappers

These methods were added in preparation for flexible protocol versions (v9+)
but are not currently used. They can be re-added when implementing those
features.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
…~98%

This commit implements comprehensive allocation reduction optimizations for the
Kafka ProduceRequest processing pipeline, reducing heap allocations from ~600+
to ~10-15 for a typical 100-record batch.

Key optimizations:

1. Streaming response generation - Responses are written directly to the protocol
   IO without building intermediate arrays of TopicProduceResponse or
   PartitionProduceResponse objects

2. Reusable hash tables - Added class-level @@headers_reuse_buffer in RecordBatch
   and instance-level @kafka_headers_hash in Client that are cleared and reused
   for each record, eliminating per-record hash allocations

3. Direct BytesMessage creation - Records are converted directly to BytesMessage
   using socket bytes without IO::Memory wrappers

4. Batch lock acquisition - Stream.publish_batch holds the message store lock
   once per topic instead of per message, with a single consumer notification
   per batch. Uses generator pattern where block returns messages one at a time

5. Manual byte tracking - Eliminated IO::Sized class allocations by manually
   tracking bytes remaining during RecordBatch parsing

6. Efficient yielding - Changed from block.call to direct yield for better
   performance

Performance impact:
- For 100-record batch with 2 headers each: ~97-98% reduction in heap allocations
- Reduced lock contention through batch operations
- Maintained protocol correctness with all tests passing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
…es following messages based on that. saves us from having to write "x-stream-offset":i64 for each message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants