Skip to content

Commit ae88abf

Browse files
committed
continuous downsampling using redis for checkpointing and notifications of new data
1 parent f7a765d commit ae88abf

12 files changed

+420
-156
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@ rayon = "1.0.2"
1919
config = "0.9.1"
2020
serde_derive = "1.0.80"
2121
serde = "1.0.80"
22-
string_template = "0.1.0"
22+
string_template = "0.2.1"
2323
lazy_static = "1.2.0"
24+
redis = "0.9.1"

README.md

+22
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,32 @@
11
# downsampler
22
Utilities for transforming InfluxDB time series data
33

4+
## Get up and running
5+
The basics are:
6+
7+
1. Install Rust (to build the project): https://rustup.rs/
8+
1. git clone [email protected]:michaelr524/downsampler.git
9+
1. cd downsampler
10+
1. cargo build --release
11+
1. Edit config.toml to your needs
12+
1. Run like this: target/release/downsampler downsample -s ‘2018-08-13 15:22:06’ -e ‘2018-08-17 19:03:17’
13+
414
The following operations are currently supported:
515

616
#### downsample
717
Create downsampled series from other series.
818

919
#### split
1020
Creates many measurement with a single serie from measurements with multiple series.
21+
22+
#### listen - continuous downsampling
23+
Continuously downsampling the configured series as new data arrives.
24+
You need to have Redis running.
25+
Redis is used for notifying downsampler about new data.
26+
Additionally, Redis is used to store a checkpoint so that downsampler knows how to restart from the point where it stopped last time it ran.
27+
The way to send updates to Redis is:
28+
```
29+
HSET downsampler_updates "binance_BTCUSDT" "1537710900000000000"
30+
```
31+
The key will be used in the downsampled series name (check `listen` > `measurement_template` in `config.toml`) and the value is the timestamp in nanoseconds format.
32+
Downsampler will downsample all the data from the previous checkpoint up to the given timestamp.

config.toml

+11
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
[listen]
2+
redis_url = "redis://127.0.0.1:6379"
3+
poll_sleep_ms = 250
4+
measurement_template = "trades_{{id}}_{{time_interval}}"
5+
query_template = """
6+
select price, amount
7+
from glukoz."glukoz-rp".trades_{{id}}_{{time_interval}}
8+
WHERE time >= {{start}} AND time < {{end}}
9+
limit {{limit}}
10+
"""
11+
112
[influxdb]
213
url = "http://localhost:8086"
314
db = "glukoz"

src/cmdargs.rs

+42-22
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use chrono::{format::ParseError, offset::TimeZone, NaiveDateTime, Utc};
2+
use clap::ArgMatches;
23
use clap::{crate_version, App, Arg, ArgGroup, SubCommand};
34
use crate::utils::time::truncate_seconds;
45
use failure_derive::Fail;
@@ -47,17 +48,17 @@ pub enum Error {
4748
DurationTooLong(OutOfRangeError),
4849
}
4950

50-
pub enum Command {
51-
Downsample,
52-
Split,
53-
}
54-
55-
pub struct CmdArgs {
51+
pub struct TimePeriod {
5652
pub raw_start: NaiveDateTime,
5753
pub raw_end: NaiveDateTime,
5854
pub start: NaiveDateTime,
5955
pub end: NaiveDateTime,
60-
pub command: Command,
56+
}
57+
58+
pub enum CmdArgs {
59+
Downsample(TimePeriod),
60+
Split(TimePeriod),
61+
Listen,
6162
}
6263

6364
fn args_definitions<'a, 'b>() -> App<'a, 'b> {
@@ -121,23 +122,32 @@ years, year, y -- defined as 365.25 days
121122
.arg(duration_arg.clone())
122123
.group(period_end_group.clone()),
123124
)
125+
.subcommand(SubCommand::with_name("listen").about("Continuous downsampling"))
124126
}
125127

126128
pub fn parse_args() -> Result<CmdArgs, Error> {
127129
let args = args_definitions().get_matches();
128130

129-
let (args, command) = match args.subcommand() {
130-
("downsample", Some(subcommand)) => (subcommand, Command::Downsample),
131-
("split", Some(subcommand)) => (subcommand, Command::Split),
131+
match args.subcommand() {
132+
("downsample", Some(subcommand)) => {
133+
let time_period = parse_time_period(subcommand)?;
134+
Ok(CmdArgs::Downsample(time_period))
135+
}
136+
("split", Some(subcommand)) => {
137+
let time_period = parse_time_period(subcommand)?;
138+
Ok(CmdArgs::Split(time_period))
139+
}
140+
("listen", Some(_)) => Ok(CmdArgs::Listen),
132141
_ => {
133142
args_definitions().print_help().unwrap();
134143
return Err(Error::CommandMissing);
135144
}
136-
};
145+
}
146+
}
137147

148+
fn parse_time_period(args: &ArgMatches) -> Result<TimePeriod, Error> {
138149
let raw_start = parse_datetime(args.value_of("start"))
139150
.map_err(|e| Error::InvalidStartArgument(Box::new(e)))?;
140-
141151
let raw_end = if args.is_present("duration") {
142152
let duration = parse_duration(args.value_of("duration"))
143153
.map_err(|e| Error::InvalidDurationArgument(Box::new(e)))?;
@@ -147,27 +157,37 @@ pub fn parse_args() -> Result<CmdArgs, Error> {
147157
.map_err(|e| Error::InvalidEndArgument(Box::new(e)))?;
148158
datetime
149159
};
150-
151160
let start = truncate_seconds(raw_start);
152161
let end = truncate_seconds(raw_end);
153162

154-
Ok(CmdArgs {
163+
Ok(TimePeriod {
155164
start,
156165
end,
157166
raw_start,
158167
raw_end,
159-
command,
160168
})
161169
}
162170

163171
pub fn print_args_info(settings: &CmdArgs) {
164-
println!("Period {:?} - {:?}", settings.raw_start, settings.raw_end);
165-
println!("Period truncated {:?} - {:?}", settings.start, settings.end);
166-
println!(
167-
"Period in nanos {:?} - {:?}",
168-
settings.start.timestamp_nanos(),
169-
settings.end.timestamp_nanos()
170-
);
172+
if let Some(time_period) = match settings {
173+
CmdArgs::Downsample(time_period) => Some(time_period),
174+
CmdArgs::Split(time_period) => Some(time_period),
175+
CmdArgs::Listen => None,
176+
} {
177+
println!(
178+
"Period {:?} - {:?}",
179+
time_period.raw_start, time_period.raw_end
180+
);
181+
println!(
182+
"Period truncated {:?} - {:?}",
183+
time_period.start, time_period.end
184+
);
185+
println!(
186+
"Period in nanos {:?} - {:?}",
187+
time_period.start.timestamp_nanos(),
188+
time_period.end.timestamp_nanos()
189+
);
190+
}
171191
}
172192

173193
fn parse_duration(date_string: Option<&str>) -> Result<Duration, Error> {

src/downsampler.rs

+20-126
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,14 @@
1-
use chrono::{NaiveDateTime, TimeZone, Utc};
1+
use crate::cmdargs::TimePeriod;
2+
use crate::downsampling::is_downsampling_interval;
23
use crate::{
3-
cmdargs::CmdArgs,
4-
influx::{
5-
extract_float_value, from_json_values, get_range, influx_client, save_points, to_point,
6-
Error, FieldValue,
7-
},
8-
lttb::{lttb_downsample, DataPoint},
9-
settings::{Config, Field},
10-
utils::{error::print_err_and_exit, time::intervals},
4+
downsampling::downsample_period, influx::influx_client, settings::Config,
5+
utils::time::intervals,
116
};
12-
use influx_db_client::Client;
13-
use influx_db_client::Point;
14-
use lazy_static::lazy_static;
157
use rayon::prelude::*;
168
use std::collections::HashMap;
17-
use std::ops::Sub;
18-
use std::time::Duration as StdDuration;
199
use string_template::Template;
2010
use time::Duration;
2111

22-
lazy_static! {
23-
static ref UNIX_EPOCH: NaiveDateTime = Utc.timestamp(0, 0).naive_utc();
24-
}
25-
2612
pub fn pre_render_names(config: &Config, template: Template) -> HashMap<(u64, &str), String> {
2713
let mut map: HashMap<(u64, &str), String> =
2814
HashMap::with_capacity(config.vars.ids.len() * config.downsampler.intervals.len());
@@ -40,7 +26,7 @@ pub fn pre_render_names(config: &Config, template: Template) -> HashMap<(u64, &s
4026
map
4127
}
4228

43-
pub fn downsample(args: &CmdArgs, config: &Config) -> () {
29+
pub fn downsample(args: &TimePeriod, config: &Config) -> () {
4430
let client = influx_client(
4531
&config.influxdb.url,
4632
&config.influxdb.db,
@@ -58,116 +44,24 @@ pub fn downsample(args: &CmdArgs, config: &Config) -> () {
5844

5945
for (start, _end) in intervals(args.start, args.end, Duration::seconds(1)) {
6046
for interval_period in config.downsampler.intervals.iter() {
61-
if start.signed_duration_since(*UNIX_EPOCH).num_seconds()
62-
% (interval_period.duration_secs as i64)
63-
== 0
64-
{
65-
let measurement_name = measurements
66-
.get(&(interval_period.duration_secs, id))
67-
.unwrap();
68-
69-
downsample_period(
70-
config,
71-
&client,
72-
&query_template,
73-
id,
74-
start,
75-
interval_period.duration_secs,
76-
measurement_name,
77-
);
78-
}
47+
if is_downsampling_interval(&start, interval_period) {
48+
let measurement_name = measurements
49+
.get(&(interval_period.duration_secs, id))
50+
.unwrap();
51+
52+
downsample_period(
53+
config,
54+
&client,
55+
&query_template,
56+
id,
57+
start,
58+
interval_period.duration_secs,
59+
measurement_name,
60+
);
61+
}
7962
}
8063
}
8164

8265
println!("end {}", id);
8366
});
8467
}
85-
86-
pub fn downsample_period(
87-
config: &Config,
88-
client: &Client,
89-
query_template: &Template,
90-
id: &str,
91-
end: NaiveDateTime,
92-
interval_duration_secs: u64,
93-
measurement_name: &str,
94-
) {
95-
let duration = Duration::from_std(StdDuration::from_secs(interval_duration_secs)).unwrap();
96-
let begin = end.sub(duration);
97-
98-
let query_str = build_query(&query_template, id, begin, end, 0, "raw");
99-
let series = match get_range(&client, &query_str) {
100-
Ok(series) => series,
101-
Err(err) => match err {
102-
Error::NoResult => return,
103-
e => print_err_and_exit(e),
104-
},
105-
};
106-
let vals = from_json_values(series.values, &config.downsampler.fields)
107-
.unwrap_or_else(|e| print_err_and_exit(e));
108-
// let _count = vals.iter().count();
109-
// println!("{} - [{} - {}] ({})", i, start, end, _count);
110-
let subset = lttb_downsample(
111-
&vals,
112-
60,
113-
config.downsampler.x_field_index,
114-
config.downsampler.y_field_index,
115-
);
116-
let points = to_influx_points(measurement_name, &vals, &subset, &config.downsampler.fields);
117-
// println!("{:#?}", &points);
118-
// TODO: handle errors
119-
save_points(&client, &config.influxdb.retention_policy, points).unwrap();
120-
}
121-
122-
pub fn to_influx_points(
123-
measurement_name: &str,
124-
raw: &[Vec<FieldValue>],
125-
downsampled: &Option<Vec<&Vec<FieldValue>>>,
126-
fields: &[Field],
127-
) -> Vec<Point> {
128-
match downsampled {
129-
Some(downsampled) => downsampled
130-
.iter()
131-
.map(|v| to_point(v, measurement_name, fields))
132-
.collect(),
133-
_ => raw
134-
.iter()
135-
.map(|v| to_point(v, measurement_name, fields))
136-
.collect(),
137-
}
138-
}
139-
140-
// pass `limit: 0` to disable limit
141-
pub fn build_query(
142-
query_template: &Template,
143-
id: &str,
144-
start: NaiveDateTime,
145-
end: NaiveDateTime,
146-
limit: i64,
147-
time_interval: &str,
148-
) -> String {
149-
let start_str = start.timestamp_nanos().to_string();
150-
let end_str = end.timestamp_nanos().to_string();
151-
let limit_str = limit.to_string();
152-
153-
let mut map = HashMap::new();
154-
map.insert("id", id);
155-
map.insert("start", &start_str);
156-
map.insert("end", &end_str);
157-
map.insert("limit", &limit_str);
158-
map.insert("time_interval", time_interval);
159-
160-
query_template.render(&map)
161-
}
162-
163-
impl DataPoint for Vec<FieldValue> {
164-
fn get_x(&self, index: usize) -> f64 {
165-
let field_value = self.get(index).unwrap();
166-
extract_float_value(field_value)
167-
}
168-
169-
fn get_y(&self, index: usize) -> f64 {
170-
let field_value = self.get(index).unwrap();
171-
extract_float_value(field_value)
172-
}
173-
}

0 commit comments

Comments
 (0)