Kairo is a real-time event pipeline that consumes system events from Kafka, batches them in Redis, and generates concise AI-powered markdown reports on a scheduled interval using OpenAI via LangChain.
It is built with Bun and TypeScript, and is designed to run as a single long-lived backend service.
- Features
- Project Structure
- Tech Stack
- Prerequisites
- Installation
- Configuration Guide: Environment & Kafka Integration
- Running the Application: Development and Production
- Reports (screenshot attached)
- Testing
- Integration with Communication Channels
- Contributing
- License
- Real-time Kafka event consumption with SASL/SCRAM-SHA-256 + TLS authentication
- Atomic Redis queue with batch overflow handling — excess events are returned to the queue, never dropped
- Scheduled AI-powered markdown reports via OpenAI and LangChain
- Configurable batch size and report interval via environment variables
- Retry logic with exponential backoff for failed AI calls
- Dead-letter queue for unrecoverable batches (
ai:batch:failed) - Built-in event simulator for local development and demos
├── index.ts # Entry point, bootstrap, graceful shutdown
├── config/
│ ├── config.ts # Typed config object
│ ├── env.ts # Zod-validated env parsing
│ └── logger.ts # Pino logger instance
├── schema/
│ └── config.schema.ts # Zod env schema
├── src/
│ ├── kafka/
│ │ ├── kafka.ts # KafkaJS client (SASL + TLS)
│ │ ├── init.ts # initKafka / disconnectKafka
│ │ ├── producer.ts # connectProducer / sendMessage / disconnectProducer
│ │ └── consumer.ts # startConsumer / disconnectConsumer
│ ├── redis/
│ │ ├── redis.ts # Redis client (TLS)
│ │ ├── init.ts # initRedis / disconnectRedis
│ │ └── queue.ts # enqueueEvent / consumeBatch
│ ├── ai/
│ │ ├── report.pipeline.ts # Cron scheduler, retry logic, dead-letter
│ │ ├── report.generator.ts # LangChain + OpenAI agent
│ │ └── report.writer.ts # Saves report to disk
│ └── simulator/
│ ├── scenario.engine.ts # Orchestrates scenario loops
│ └── event.generator.ts # Generates typed event payloads
├── prompts/
│ └── prompts.ts # System and user prompt templates
└── util/
└── extract.report.ts # Extracts text from LangChain messages- Bun — runtime and package manager
- KafkaJS — Kafka producer and consumer
- node-redis — Redis client
- LangChain — AI agent orchestration
- OpenAI — report generation model
- node-cron — report scheduling
- Zod — runtime environment validation
- Pino — structured JSON logging
- TypeScript — strict mode enabled
- Bun v1.0 or higher
- A running Kafka broker with SASL/SCRAM-SHA-256 + TLS
- A running Redis instance with TLS
- An OpenAI API key
- A Kafka CA certificate at
cert/ca.pem
git clone https://github.com/Draftify/Kairo.git
cd Kairo
bun installHere is the example .env file that you need to create in the root of the project.
PORT=9070
KAFKA_BROKER=
KAFKA_PORT=
KAFKA_USERNAME=
KAFKA_PASSWORD=
KAFKA_CLIENT_ID=kairo
KAFKA_GROUP_ID=kairo-group
SIMULATION_ENABLED=false/true
REDIS_URL=
REPORT_INTERVAL_MINUTES=
OPENAI_MODEL=
OPENAI_API_KEY=
BATCH_LIMIT=15Kairo requires a CA certificate to establish a TLS connection to Kafka. Place it at:
cert/ca.pemDevelopment (with watch mode):
bun devProduction:
bun run build
bun startGenerated reports are saved to the ./reports/ directory as markdown files:
reports/
report-2026-04-26T04-22-27-472Z.md
report-2026-04-26T04-27-27-891Z.md
Each report is structured as:
## Summary — *what just happened*
## Key Metrics — *the numbers that matter*
## Anomalies & Warnings — *what needs eyes on it now*
**Bottom Line:** ...Example Report Screenshot:
You can integrate Kairo to post the generated reports directly to communication channels such as Microsoft Teams, Discord, Slack, or any custom webhook.
How it Works:
-
Create a Function:
First, create a function that handles communication with the desired service (e.g., Teams, Discord, Slack, or a custom webhook). This function should be configured with the necessary API keys or credentials for the respective service. -
Configure API Keys and Credentials:
Ensure that you configure the necessary API keys or credentials in your environment variables or config files. For example, you'll need:- A Slack Incoming Webhook URL for Slack.
- A Teams Webhook URL for Microsoft Teams.
- A Discord Webhook URL for Discord.
- The appropriate authentication method or endpoint for any custom webhooks.
In sha'Allah, the test code will be released within the next couple of months, as we continue to work diligently on its development.
If you would like to contribute to this, please open an issue on GitHub to discuss your ideas or proposed changes. Pull requests are also welcome.
This is available under the MIT License. You are free to use, modify, and distribute this project as you see fit.
