🚀 6.3x faster than serde_json | 🎯 5.3x faster progressive loading | 💾 Bounded memory usage | 🏗️ Production Ready
New in v0.3.0: Production-ready code quality with zero clippy warnings, Clean Architecture compliance, and comprehensive test coverage (196 tests). Now requires nightly Rust for zero-cost abstractions.
|
|
|
|
|
|
Modern web applications face a fundamental challenge: large JSON responses block UI rendering.
- 📊 Analytics dashboard loads 5MB of JSON
- ⏱️ User waits 2-3 seconds seeing nothing
- 😤 User thinks app is broken and refreshes
- 🔄 The cycle repeats
Solution | Problem |
---|---|
Pagination | Requires multiple round-trips, complex state management |
GraphQL | Still sends complete response, just smaller |
JSON streaming | No semantic understanding, can't prioritize |
Compression | Reduces size but not time-to-first-byte |
PJS revolutionizes JSON transmission by understanding your data semantically and prioritizing what matters.
#[derive(JsonPriority)]
struct UserDashboard {
#[priority(critical)] // Sent in first 10ms
user_id: u64,
user_name: String,
#[priority(high)] // Sent in next 50ms
recent_activity: Vec<Activity>,
notifications: Vec<Notification>,
#[priority(low)] // Sent when available
detailed_analytics: Analytics, // 4MB of data
}
Traditional JSON Loading:
[████████████████████] 100% - 2000ms - Full UI renders
PJS Loading:
[██░░░░░░░░░░░░░░░░░░] 10% - 10ms - Critical UI visible
[██████░░░░░░░░░░░░░░] 30% - 50ms - Interactive UI
[████████████████████] 100% - 2000ms - Full data loaded
User Experience: ⚡ Instant → 😊 Happy
Production-ready Axum integration with full REST API, session management, and real-time streaming.
- AdaptiveFrameStream: Client capability-based optimization
- BatchFrameStream: High-throughput batch processing
- PriorityFrameStream: Priority-based frame ordering with buffering
Clean architecture with CQRS pattern, event sourcing, and ports & adapters for maximum testability and maintainability.
- Thread-safe in-memory storage and metrics collection
- Event publishing with subscription support
- Prometheus metrics integration
- Comprehensive middleware stack (CORS, security, compression)
Automatic format detection supporting JSON, NDJSON, and Server-Sent Events based on client Accept headers.
Powered by sonic-rs
for blazing fast JSON processing with zero-copy operations.
Complete WebSocket implementation with priority-based frame delivery:
- Session Management: Track active WebSocket connections with metrics
- Priority-Based Delivery: Critical data sent first with adaptive delays
- Schema-Based Compression: Intelligent compression using multiple strategies
- Progressive Enhancement: Skeleton-first streaming with incremental updates
- Demo Servers: Interactive demonstrations of real-time streaming capabilities
- Zero Clippy Warnings: All 44+ clippy warnings resolved across entire codebase
- Modern Format Strings: Updated to
format!("{var}")
syntax throughout - Enhanced Error Handling: Proper Result patterns and async trait compatibility
- Memory Safety: Fixed await-holding lock patterns and buffer alignment issues
- 196 Tests Passing: Complete test suite with all features enabled
- Domain Layer Isolation: Custom
JsonData
value object replacingserde_json::Value
- Type Safety: Eliminated all architecture violations in domain layer
- Seamless Conversion:
From
trait implementations forJsonData ↔ serde_json::Value
- Proper Boundaries: Clear separation between domain and infrastructure errors
- Axum v0.8 Compatibility: Updated route syntax from
:param
to{param}
format - StreamExt Integration: Fixed async stream processing with proper trait imports
- Body Type Updates: Modern HTTP body handling for latest axum/hyper versions
- All Tests Passing: Complete HTTP integration test suite validation
- Architecture Compliance: Resolved all Clean Architecture violations
- Lint Standards: Zero warnings with strict linting enabled (
-D warnings
) - Async Patterns: Fixed await-across-locks and other async safety issues
- Type System: Enhanced type safety with better generic bounds and aliases
Metric | serde_json | sonic-rs | PJS | PJS Advantage |
---|---|---|---|---|
Small JSON (43B) | 275ns | 129ns | 312ns | Competitive |
Medium JSON (351B) | 1,662ns | 434ns | 590ns | 2.8x vs serde |
Large JSON (357KB) | 1,294μs | 216μs | 204μs | 6.3x vs serde, 1.06x vs sonic |
Memory Efficiency | Baseline | Fast | 5.3x faster progressive | Bounded memory |
Progressive Loading | Batch-only | Batch-only | 37μs vs 198μs | 5.3x faster |
- 6.3x faster than serde_json for large JSON processing
- 1.06x faster than sonic-rs (SIMD library) on large datasets
- 5.3x faster progressive loading vs traditional batch processing
- 1.71 GiB/s sustained throughput (exceeding sonic-rs 1.61 GiB/s)
Add PJS to your Cargo.toml
:
[dependencies]
pjson-rs = "0.3.0"
# Optional: for HTTP server integration
axum = "0.8"
tokio = { version = "1", features = ["full"] }
Or use cargo:
cargo add pjson-rs
use std::sync::Arc;
use pjson_rs::{
application::{
handlers::{InMemoryCommandHandler, InMemoryQueryHandler},
services::{SessionService, StreamingService},
},
infrastructure::{
adapters::{InMemoryStreamRepository, InMemoryEventPublisher, InMemoryMetricsCollector},
http::axum_adapter::{create_pjs_router, PjsAppState},
},
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create infrastructure
let repository = Arc::new(InMemoryStreamRepository::new());
let event_publisher = Arc::new(InMemoryEventPublisher::new());
let metrics_collector = Arc::new(InMemoryMetricsCollector::new());
// Create CQRS handlers
let command_handler = Arc::new(InMemoryCommandHandler::new(
repository.clone(), event_publisher, metrics_collector.clone()
));
let query_handler = Arc::new(InMemoryQueryHandler::new(repository, metrics_collector));
// Create services
let session_service = Arc::new(SessionService::new(command_handler.clone(), query_handler.clone()));
let streaming_service = Arc::new(StreamingService::new(command_handler));
// Build Axum app
let app = create_pjs_router()
.with_state(PjsAppState::new(session_service, streaming_service));
// Start server
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
println!("🚀 PJS Server running on http://127.0.0.1:3000");
axum::serve(listener, app).await?;
Ok(())
}
Use the official PJS client library for seamless integration:
npm install @pjs/client
import { PjsClient, createHttpTransport } from '@pjs/client';
// Create client with HTTP transport
const client = new PjsClient({
transport: createHttpTransport({
baseUrl: 'http://localhost:3000',
format: 'sse' // or 'json', 'ndjson'
})
});
// Stream data with priority-based delivery
await client.stream({
data: {
users: [/* large array */],
dashboard: { /* complex object */ }
},
onFrame: (frame) => {
// Frames arrive in priority order
if (frame.priority >= 90) {
updateUI(frame.data); // Critical data first
}
}
});
import { PjsClient, createWebSocketTransport } from '@pjs/client';
const client = new PjsClient({
transport: createWebSocketTransport({
url: 'ws://localhost:3001/ws'
})
});
// Real-time streaming with priority handling
await client.connect();
client.onFrame((frame) => {
console.log(`Priority ${frame.priority}:`, frame.data);
// Handle based on priority
switch (frame.priority) {
case 'critical':
showImmediate(frame.data);
break;
case 'high':
queueForNextFrame(frame.data);
break;
default:
processInBackground(frame.data);
}
});
use pjson_rs::{
ApplicationResult,
domain::value_objects::SessionId,
};
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::main]
async fn main() -> ApplicationResult<()> {
// Connect to WebSocket streaming server
let (ws_stream, _) = connect_async("ws://127.0.0.1:3001/ws")
.await
.expect("Failed to connect");
let (mut write, mut read) = ws_stream.split();
// Receive prioritized frames
while let Some(message) = read.next().await {
match message? {
Message::Text(text) => {
let frame: serde_json::Value = serde_json::from_str(&text)?;
match frame["@type"].as_str() {
Some("pjs_frame") => {
let priority = frame["@priority"].as_u64().unwrap_or(0);
if priority >= 200 {
println!("🚨 Critical data: {}", frame["data"]);
} else if priority >= 100 {
println!("📊 High priority: {}", frame["data"]);
} else {
println!("📝 Background data received");
}
}
Some("stream_complete") => {
println!("✅ Stream completed!");
break;
}
_ => {}
}
}
_ => {}
}
}
Ok(())
}
Start the interactive demos to see PJS in action:
# WebSocket streaming server with priority-based delivery
cargo run --bin websocket_streaming --manifest-path crates/pjs-demo/Cargo.toml
# Interactive demo with HTML interface and real-time visualization
cargo run --bin interactive_demo --manifest-path crates/pjs-demo/Cargo.toml
# Simple demo server with basic streaming
cargo run --bin simple_demo --manifest-path crates/pjs-demo/Cargo.toml
# Performance comparison demo (PJS vs traditional JSON)
cargo run --bin performance_comparison --manifest-path crates/pjs-demo/Cargo.toml
Or run root-level examples:
# Complete Axum HTTP server
cargo run --example axum_server
# Advanced streaming demo server
cargo run --example streaming_demo_server
# Simple usage patterns
cargo run --example simple_usage
Then visit http://127.0.0.1:3000
to see priority-based streaming in action.
Perfect for:
- 📊 Real-time dashboards - Show key metrics instantly
- 📱 Mobile apps - Optimize for slow networks
- 🛍️ E-commerce - Load product essentials first
- 📈 Financial platforms - Prioritize critical trading data
- 🎮 Gaming leaderboards - Show player's rank immediately
PJS implements a clean, layered architecture following Domain-Driven Design principles:
Core business logic with value objects (Priority, SessionId, JsonPath) and aggregates (StreamSession) ensuring data consistency.
CQRS pattern with separate Command and Query handlers, plus high-level services (SessionService, StreamingService) orchestrating workflows.
Adapters implementing domain ports:
- Storage: In-memory repositories with thread-safe concurrent access
- Events: Publisher/subscriber pattern for domain event distribution
- Metrics: Performance monitoring with Prometheus integration
- HTTP: Complete Axum server with middleware stack
Multi-format streaming support:
- JSON: Standard response format
- NDJSON: Newline-delimited for efficient processing
- Server-Sent Events: Real-time browser compatibility
- Automatic format detection via Accept headers
Intelligent frame processing:
- Priority-based delivery: Critical data first
- Adaptive buffering: Dynamic sizing based on client performance
- Batch processing: High-throughput chunk aggregation
pjs/
├── crates/
│ ├── pjs-core/ # Core protocol, domain logic, and HTTP integration
│ │ ├── src/
│ │ │ ├── application/ # CQRS handlers, services, DTOs
│ │ │ ├── domain/ # Value objects, entities, aggregates
│ │ │ ├── infrastructure/ # HTTP, WebSocket, repositories, adapters
│ │ │ ├── parser/ # SIMD, zero-copy, buffer pools
│ │ │ ├── stream/ # Priority streaming, reconstruction
│ │ │ └── compression/ # Schema-based compression
│ │ ├── examples/ # Standalone demos (zero-copy, compression)
│ │ └── tests/ # Integration tests
│ ├── pjs-demo/ # Interactive demo servers with WebSocket streaming
│ │ └── src/
│ │ ├── servers/ # Demo server implementations
│ │ ├── clients/ # WebSocket client demos
│ │ ├── data/ # Sample data generators (analytics, ecommerce)
│ │ └── static/ # HTML interfaces
│ ├── pjs-js-client/ # JavaScript/TypeScript client library ✅ IMPLEMENTED
│ │ ├── src/ # TypeScript source code with transport layers
│ │ ├── tests/ # Jest test suite with full coverage
│ │ └── package.json # NPM configuration and dependencies
│ └── pjs-bench/ # Benchmarking suite
│ └── benches/ # Criterion.rs performance benchmarks
└── examples/ # Root-level usage examples
├── axum_server.rs # Complete HTTP server demo
├── simple_usage.rs # Basic usage patterns
└── streaming_demo_server.rs # Advanced streaming demo
- Phase 1: ✅ Core foundation (100% complete)
- Phase 2: ✅ Protocol layer (100% complete)
- Phase 3: ✅ Client/Server framework (100% complete)
- Phase 4: ✅ Transport layer (100% complete)
- Phase 5: ✅ Production features (100% complete)
- Phase 6: ✅ Real-Time Streaming (100% complete)
- Phase 7: ✅ JavaScript/TypeScript Client SDK (100% complete)
- Phase 8: ✅ Code Quality & Production Readiness (100% complete)
- Overall: ~98% of core functionality implemented
- ✅ pjs-core: Complete Rust implementation with Clean Architecture
- ✅ pjs-demo: Interactive demo servers with real-time WebSocket streaming
- ✅ pjs-js-client: Full TypeScript/JavaScript client library with transport layers
- ✅ pjs-bench: Comprehensive benchmarking suite with performance validation
- ✅ Examples: Multiple working examples from simple to advanced usage
The server provides a complete REST API:
# Create a new session
POST /pjs/sessions
Content-Type: application/json
{
"max_concurrent_streams": 10,
"timeout_seconds": 3600,
"client_info": "My App v1.0"
}
# Response: { "session_id": "sess_abc123", "expires_at": "..." }
# Get session info
GET /pjs/sessions/{session_id}
# Start streaming data
POST /pjs/stream/{session_id}
Content-Type: application/json
{
"data": { "users": [...], "products": [...] },
"priority_threshold": 50,
"max_frames": 100
}
# Stream frames (JSON format)
GET /pjs/stream/{session_id}/frames?format=json&priority=80
# Real-time Server-Sent Events
GET /pjs/stream/{session_id}/sse
Accept: text/event-stream
# System health check
GET /pjs/health
# Response: { "status": "healthy", "version": "0.3.0" }
A complete working server is available at examples/axum_server.rs
. To run it:
# Start the server
cargo run --example axum_server
# Test endpoints
curl -X POST http://localhost:3000/pjs/sessions \
-H "Content-Type: application/json" \
-d '{"max_concurrent_streams": 5}'
# Check health
curl http://localhost:3000/pjs/health
# View metrics
curl http://localhost:3000/examples/metrics
- Throughput: >4 GB/s with sonic-rs
- Time to First Byte: <10ms for critical data
- Memory Efficiency: 5-10x reduction vs traditional parsing
- CPU Utilization: Full SIMD acceleration
- Rust nightly (required for
impl Trait
in associated types) - CPU with AVX2 support (recommended for SIMD acceleration)
# Install nightly Rust
rustup install nightly
# Set nightly for this project
rustup override set nightly
# Or use nightly globally
rustup default nightly
# Clone repository
git clone https://github.com/bug-ops/pjs
cd pjs
# Ensure nightly Rust is active
rustup override set nightly
# Build with optimizations
cargo build --release
# Run tests
cargo test --workspace
# Run the complete HTTP server example
cargo run --example axum_server
# Build with optional features
cargo build --features "http-client,prometheus-metrics"
http-client
: Enable HTTP-based event publishingprometheus-metrics
: Enable Prometheus metrics collectionsimd-auto
: Auto-detect best SIMD support (default)compression
: Enable compression middleware
PJS provides true zero-cost abstractions using nightly Rust features for maximum performance. The Universal Framework Integration Layer uses Generic Associated Types (GATs) with impl Trait
to eliminate all runtime overhead:
use pjson_rs::infrastructure::integration::StreamingAdapter;
use std::future::Future;
// Zero-cost framework integration with GATs
impl StreamingAdapter for YourFramework {
type Request = YourRequest;
type Response = YourResponse;
type Error = YourError;
// TRUE zero-cost futures - no Box allocation!
type StreamingResponseFuture<'a> = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn create_streaming_response<'a>(
&'a self,
session_id: SessionId,
frames: Vec<StreamFrame>,
format: StreamingFormat,
) -> Self::StreamingResponseFuture<'a> {
// Direct async block - compiler generates optimal Future type
async move {
// Your framework-specific logic here
Ok(your_response)
}
}
fn framework_name(&self) -> &'static str {
"your_framework"
}
}
Zero-Cost Abstractions:
- 1.82x faster trait dispatch vs async_trait
- Zero heap allocations for futures
- Pure stack allocation - no runtime overhead
- Static dispatch eliminates vtables
- Complete inlining for hot paths
- ✅ Axum: Full native integration with zero-cost GAT futures
- 🔧 Any Framework: Universal adapter with true zero-cost abstractions
- 📋 Planned: Helper macros for popular frameworks (Actix, Warp, Tide)
// Axum (native support)
use pjson_rs::infrastructure::http::axum_adapter::create_pjs_router;
let app = create_pjs_router().with_state(app_state);
// Custom framework integration
use pjson_rs::infrastructure::adapters::UniversalAdapter;
let adapter = UniversalAdapter::new()
.with_serializer(your_serializer)
.with_transport(your_transport);
The HTTP server includes production-ready middleware:
use pjson_rs::infrastructure::http::middleware::*;
let app = create_pjs_router()
.layer(axum::middleware::from_fn(pjs_cors_middleware))
.layer(axum::middleware::from_fn(security_middleware))
.layer(axum::middleware::from_fn(health_check_middleware))
.layer(PjsMiddleware::new()
.with_compression(true)
.with_metrics(true)
.with_max_request_size(10 * 1024 * 1024))
.with_state(app_state);
Built-in Prometheus metrics support:
// Automatically tracks:
// - pjs_active_sessions
// - pjs_total_sessions_created
// - pjs_frames_processed_total
// - pjs_bytes_streamed_total
// - pjs_frame_processing_time_ms
let metrics = collector.export_prometheus();
// Expose at /metrics endpoint for Prometheus scraping
Comprehensive domain event tracking:
// Events automatically generated:
// - SessionCreated, SessionActivated, SessionEnded
// - StreamStarted, StreamCompleted, FrameGenerated
// - PriorityAdjusted, ErrorOccurred
publisher.subscribe("SessionCreated", |event| {
println!("New session: {}", event.session_id());
});
We welcome contributions! See CONTRIBUTING.md for guidelines.
# Install development tools
rustup component add clippy rustfmt
# Run checks
cargo clippy --workspace
cargo fmt --check
# Run all tests
cargo test --workspace --all-features
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE)
- MIT License (LICENSE-MIT)
at your option.
Want to try PJS immediately? Here's the fastest way:
# Clone and run
git clone https://github.com/bug-ops/pjs
cd pjs
# Set nightly Rust (required)
rustup override set nightly
# Run the server
cargo run --example axum_server
# In another terminal, test the API
curl -X POST http://localhost:3000/pjs/sessions \
-H "Content-Type: application/json" \
-d '{"max_concurrent_streams": 5}'
# Try Server-Sent Events streaming
curl -N -H "Accept: text/event-stream" \
http://localhost:3000/pjs/stream/{session_id}/sse
To verify the performance claims, run the comprehensive benchmark suite:
# Run all benchmarks
cargo bench -p pjs-bench
# Or run specific benchmarks:
cargo bench -p pjs-bench --bench simple_throughput # Core parsing speed
cargo bench -p pjs-bench --bench memory_benchmarks # Memory efficiency
cargo bench -p pjs-bench --bench streaming_benchmarks # Progressive loading
Results show PJS 6.3x faster than serde_json and 1.06x faster than sonic-rs on large JSON.
The server will show:
- 🚀 Server starting message
- 📊 Health check endpoint
- 📝 Available API endpoints
- 🎯 Demo data streaming capabilities
- Connection lifecycle management ✅
- WebSocket real-time streaming ✅
- Performance benchmarks vs alternatives ✅
- JavaScript/TypeScript client library ✅
- Universal framework integration layer
- Schema validation engine
- Custom priority strategies
Built with:
- sonic-rs - Lightning fast SIMD JSON parser
- axum - Ergonomic web framework for Rust
- tokio - Async runtime for Rust
- bytes - Efficient byte buffer management
- 📖 Documentation - Complete protocol specification
- 📋 Changelog - Detailed version history
- 📊 Benchmarks - Comprehensive performance results
- 💬 Discussions - Questions and ideas
PJS: Because users shouldn't wait for data they don't need yet.