This project implements a real-time anomaly detection and summarization system for a water treatment facility. It generates sensor data (temperature, pressure, flow), detects anomalies (spikes, drifts, dropouts), summarizes anomalies using a large language model (LLM), and provides a REST API to monitor anomalies, summaries, and system status. The system uses Redis for data streaming and storage, FastAPI for the API, and LangChain for LLM integration, all containerized with Docker.
- Docker and Docker Compose: Install Docker Desktop for local development.
- Python 3.12: Required if running scripts outside Docker.
- Redis: Managed via Docker (included in
docker-compose.yml). - LLM Server: A
llama_cpp.serverinstance (or compatible OpenAI API server) running locally athttp://localhost:8008/v1for summarization.
.
├── api/
│ ├── app.py
│ ├── Dockerfile
│ └── requirements.txt
├── data_generator/
│ ├── generate.py
│ ├── Dockerfile
│ └── requirements.txt
├── anomaly_detector/
│ ├── detector.py
│ ├── Dockerfile
│ └── requirements.txt
├── summary_generator/
│ ├── summarizer.py
│ ├── Dockerfile
│ └── requirements.txt
├── docker-compose.yml
└── README.md
Each component has a requirements.txt file. Example contents (adjust as needed):
# api/requirements.txt
fastapi==0.115.0
uvicorn==0.31.0
redis==5.0.8
python-dotenv==1.0.1
# data_generator/requirements.txt
redis==5.0.8
python-dotenv==1.0.1
# anomaly_detector/requirements.txt
redis==5.0.8
python-dotenv==1.0.1
# summary_generator/requirements.txt
redis==5.0.8
python-dotenv==1.0.1
langchain-openai==0.2.2
-
Clone the Repository:
git clone <repository-url> cd <repository-directory>
-
Set Up Environment Variables: Create a
.envfile in the project root:REDIS_HOST=redis OPENAI_API_BASE=http://host.docker.internal:8008/v1 OPENAI_API_KEY=unknown MODEL_NAME=llama-2-7b-chat.Q4_0.gguf TEMPERATURE=0.1 SUMMARY_INTERVAL=60 MAX_BUFFER_SIZE=50
-
Set Up LLM Server:
- Run a
llama_cpp.serverinstance locally, exposing an OpenAI-compatible API athttp://localhost:8008/v1. - Example command (adjust model path):
python -m llama_cpp.server --model /path/to/llama-2-7b-chat.Q4_0.gguf --host 0.0.0.0 --port 8008
- Run a
The anomaly detection system (detector.py) identifies three types of anomalies in sensor data (temperature, pressure, flow) from the water treatment facility:
Sudden, extreme deviations in sensor readings.
- Temperature: < 5°C or > 40°C
- Pressure: < 0.5 bar or > 4.0 bar
- Flow: < 10 L/min or > 120 L/min
- Example: A temperature reading of 45°C triggers a spike anomaly.
Sustained deviations outside normal operating ranges for a specified duration (15 seconds).
- Temperature: < 10°C or > 35°C for >15 seconds
- Pressure: < 1.0 bar or > 3.0 bar for >15 seconds
- Flow: < 20 L/min or > 100 L/min for >15 seconds
- Example: A pressure reading of 0.8 bar persisting for 16 seconds triggers a drift anomaly.
Gaps in sensor data exceeding 10 seconds.
- Threshold: Time between consecutive sensor readings > 10 seconds
- Example: If no data is received for 12 seconds, a dropout anomaly is triggered.
These thresholds are defined in detector.py under SPIKE_THRESHOLDS and DRIFT_THRESHOLDS. Adjust them by modifying the script to suit specific operational requirements.
The summarizer.py script uses LangChain (langchain-openai) to integrate with an LLM for generating concise summaries of detected anomalies:
- LLM Client: Configured to connect to an OpenAI-compatible
llama_cpp.serverendpoint (default:http://localhost:8008/v1). - Model: Defaults to
llama-2-7b-chat.Q4_0.gguf, configurable viaMODEL_NAMEenvironment variable. - Async Summarization: Uses
ainvokefor non-blocking LLM calls to summarize anomalies every 60 seconds (configurable viaSUMMARY_INTERVAL). - Buffer Management: Limits the anomaly buffer to 50 entries (configurable via
MAX_BUFFER_SIZE) to prevent context length errors. - Prompt: Formats anomalies as JSON and instructs the LLM to produce a natural, technical summary with time ranges, critical values, and sensor IDs.
Example summary output:
During the period from 2025-05-28T00:01:00Z to 2025-05-28T00:02:00Z, sensor wtf-pipe-1 reported two anomalies. A temperature spike of 45.2°C was detected at 2025-05-28T00:01:23Z, and a pressure drift of 0.8 bar persisted for 16.3 seconds starting at 2025-05-28T00:01:30Z.
The FastAPI server (app.py) provides three endpoints to monitor the system:
- Description: Retrieve the latest anomalies from Redis (
anomaly_history). - Query Parameter:
limit(int, default: 100) – Maximum number of anomalies to return. - Response:
[ { "type": "spike", "timestamp": "2025-05-28T00:01:23Z", "sensor_id": "wtf-pipe-1", "parameter": "temperature", "value": 45.2, "message": "Temperature spike detected: 45.2" }, ... ] - Errors: 500 if Redis or JSON parsing fails.
- Description: Retrieve all summaries from Redis (
summary_history). - Response:
{ "summaries": [ "During the period from 2025-05-28T00:01:00Z to 2025-05-28T00:02:00Z, ...", ... ], "count": 5 } - Errors: 500 if Redis access fails.
- Description: Check system health, including Redis connection and data stream activity.
- Response:
{ "data_stream": { "healthy": true, "last_received": "2025-05-28T00:01:23Z", "message": "Data flowing normally" }, "redis": { "connected": true, "message": "Redis connected" } } - Errors: Returns error messages in
redis.messageif Redis is unavailable.
Access the API at http://localhost:8000 (or the deployed host). Use /docs for interactive Swagger UI.
-
Ensure Prerequisites:
- Docker and Docker Compose installed.
- LLM server running at
http://localhost:8008/v1(adjustOPENAI_API_BASEif different).
-
Create Directories: Organize the project as shown in the Project Structure section, placing each script and Dockerfile in the appropriate subdirectory.
-
Build and Run with Docker Compose:
docker-compose up --build
This starts:
redis: Redis server (port 6379).api: FastAPI server (port 8000).data-generator: Generates sensor data every 2 seconds.anomaly-detector: Detects anomalies in sensor data.summary-generator: Summarizes anomalies every 60 seconds.
-
Access the API:
- Open
http://localhost:8000/docsfor Swagger UI. - Test endpoints:
curl http://localhost:8000/status curl http://localhost:8000/anomalies curl http://localhost:8000/summary
- Open
-
Stop the Services:
docker-compose down
- Logging: Each component (
generate.py,detector.py,summarizer.py,app.py) logs to stdout, visible in Docker logs (docker-compose logs <service>). - System Status: The
/statusendpoint monitors Redis connectivity and data stream health (data considered stale if not received within 60 seconds). - Anomaly History: Stored in Redis (
anomaly_history, max 100 entries) and accessible via/anomalies. - Summary History: Stored in Redis (
summary_history) and accessible via/summary. - Metrics: Add Prometheus or similar for advanced metrics (e.g., anomaly rates, API response times) if needed.
- Environment Variables: Sensitive settings (e.g.,
OPENAI_API_KEY,REDIS_HOST) are loaded from a.envfile, which should be excluded from version control (add.envto.gitignore). - Network: Docker Compose uses a default network, isolating services. Expose only the API port (8000) externally.
- Redis: No authentication is configured by default. For production, enable Redis authentication or restrict access via network policies.
- LLM Server: The
llama_cpp.serverendpoint (http://localhost:8008/v1) is assumed local. Secure it with authentication or HTTPS in production. - API Security: Add authentication (e.g., OAuth2) to FastAPI endpoints for production use. Consider rate limiting to prevent abuse.
- Data Validation: The system assumes valid JSON from Redis. Add input validation in
detector.pyandsummarizer.pyto handle malformed data robustly.
- Redis Connection Errors: Ensure
REDIS_HOST=redisresolves correctly in Docker. Checkdocker-compose logs redis. - LLM Errors: Verify
llama_cpp.serveris running and accessible atOPENAI_API_BASE. Checksummarizerlogs for errors. - Context Length Errors: Adjust
MAX_BUFFER_SIZEin.envif the LLM reports token limit issues. - API Errors: Check
apilogs for FastAPI or Redis issues.
For further assistance, contact the project maintainer or open an issue in the repository.