Eventstream pipeline for storing and re-sending events inside the system.
go get -v -u github.com/geniusrabbit/eventstream/cmd/eventstream
docker run -d -it --rm -v ./custom.config.hcl:/config.hcl \
geniusrabbit/eventstream
- kafka
- NATS & NATS stream
- Redis stream
- Clickhouse
- Vertica
- kafka
- NATS
- Redis stream
Supports two file formats YAML & HCL
stores {
clickhouse_1 {
connect = "{{@env:CLICKHOUSE_STORE_CONNECT}}"
buffer = 1000
init_query = [<<Q
CREATE TABLE IF NOT EXISTS stat.testlog (
timestamp DateTime
, datemark Date default toDate(timestamp)
, service String
, msg String
, error String
, created_at DateTime default now()
) Engine=Memory COMMENT 'The test table';
Q]
}
kafka_1 {
connect = "{{@env:KAFKA_EVENTS_CONNECT}}"
}
}
// Source could be any supported stream service like kafka, nats, etc...
sources {
nats_1 {
connect = "{{@env:NATS_SOURCE_CONNECT}}"
format = "json"
}
}
// Streams it's pipelines which have source and destination store
streams {
log_1 {
store = "clickhouse_1"
source = "nats_1"
target = "testlog"
// Optional if fields in log and in message the same
// Transforms into:
// INSERT INTO testlog (service, msg, error, timestamp) VALUES($srv, $msg, $err, @toDateTime($timestamp))
fields = "service=srv,msg,error=err,timestamp=@toDateTime({{timestamp:date}})"
where = "srv == \"main\""
metrics = [
{
name = "log.counter"
type = "counter"
tags {
server = "{{srv}}"
}
}
]
}
kafka_retranslate {
store = "kafka_1"
source = "nats_1"
targets = [
{
fields = {
server = "{{srv}}"
timestamp = "{{timestamp}}"
}
where = "type = \"statistic\""
}
]
where = "srv = \"events\""
}
}
Metrics helps analyze some events during processing and monitor streams state.
Every stream can process metrics with the keyword metrics
.
Example:
metrics = [
{
name = "log.counter"
type = "counter"
tags { server = "{{srv}}" }
},
{
name = "actions.counter"
type = "counter"
tags { action = "{{action}}" }
},
{...}
]
All metrics available by URL /metrics
with prometheus protocol.
To activate metrics need to define profile connection port.
SERVER_PROFILE_MODE=net
SERVER_PROFILE_LISTEN=:6060
curl "http://hostname:port/health-check"
{"status":"OK"}
- Add processing custom error metrics
- Add MySQL database storage
- Add PostgreSQL database storage
- Add MongoDB database storage
- Add HTTP/Ping driver storage
- Add Redis database storage
- Prepare evetstream as Framework extension
- Add Kafka stream writer support
- Add NATS stream writer support
- Add Redis stream source/storage support
- Add RabbitMQ stream source/storage support
- Add health check API
- Add customizable prometheus metrics
- Add 'where' stream condition (http://github.com/Knetic/govaluate)
- Ack message only if success
- Buffering all data until be stored
- Add support HCL config