-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtimescale_setup.rb
154 lines (128 loc) · 4.73 KB
/
timescale_setup.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
require 'bundler/inline'
gemfile(true) do
source 'https://rubygems.org'
gem 'timescaledb'
gem 'pg'
gem 'activerecord'
gem 'pry'
gem 'faker'
end
require 'timescaledb'
require 'active_record'
require 'pp'
require 'faker'
# Connect to database using command line argument
ActiveRecord::Base.establish_connection(ENV['DATABASE_URL'])
# Define our IoT sensor measurement model
class Measurement < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper
acts_as_hypertable time_column: 'time',
segment_by: 'device_id',
value_column: 'temperature'
scope :avg_temperature, -> { select('device_id, avg(temperature) as temperature').group('device_id') }
scope :avg_humidity, -> { select('device_id, avg(humidity) as humidity').group('device_id') }
scope :battery_stats, -> { select('device_id, min(battery_level) as min_battery, avg(battery_level) as battery_level').group('device_id') }
continuous_aggregates scopes: [:avg_temperature, :avg_humidity, :battery_stats],
timeframes: [:hour, :day],
refresh_policy: {
hour: {
start_offset: '3 hours',
end_offset: '1 hour',
schedule_interval: '1 hour'
},
day: {
start_offset: '3 days',
end_offset: '1 day',
schedule_interval: '1 day'
}
}
end
# Setup Hypertable (as you would in a migration)
ActiveRecord::Base.connection.instance_exec do
# Enable logging
ActiveRecord::Base.logger = Logger.new(STDOUT)
enable_extension('timescaledb')
# Drop existing tables and aggregates
Measurement.drop_continuous_aggregates rescue nil
drop_table(:measurements, if_exists: true, cascade: true)
# Create the measurements table as a hypertable
hypertable_options = {
time_column: 'time',
chunk_time_interval: '1 day',
compress_segmentby: 'device_id',
compress_orderby: 'time DESC',
compress_after: '7 days',
# drop_after: '30 days' # Optional: delete chunks after 30 days
}
create_table(:measurements, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :time, null: false
t.text :device_id, null: false
t.float :temperature
t.float :humidity
t.float :battery_level
end
# Create continuous aggregates
Measurement.create_continuous_aggregates
end
# Generate sample data
def generate_sample_data(total: 1000, time: 1.month.ago)
devices = ['device1', 'device2', 'device3']
# Initialize base values for each device
device_states = devices.each_with_object({}) do |device, states|
states[device] = {
temperature: 25.0 + rand(-2.0..2.0), # Start around 25°C with small variation
humidity: 50.0 + rand(-5.0..5.0), # Start around 50% with small variation
battery_level: 100.0 # Start with full battery
}
end
total.times.map do
time = time + rand(60).seconds
device = devices.sample
current_state = device_states[device]
# Generate new values with small variations from previous values
new_temp = current_state[:temperature] + rand(-0.5..0.5) # Small temperature changes
new_temp = [30.0, [20.0, new_temp].max].min # Keep between 20-30°C
new_humidity = current_state[:humidity] + rand(-1.0..1.0) # Small humidity changes
new_humidity = [60.0, [40.0, new_humidity].max].min # Keep between 40-60%
new_battery = current_state[:battery_level] - rand(0.0..0.1) # Slow battery drain
new_battery = [0.0, new_battery].max # Don't go below 0%
# Update device state
device_states[device] = {
temperature: new_temp,
humidity: new_humidity,
battery_level: new_battery
}
# Return measurement
{
time: time,
device_id: device,
temperature: new_temp,
humidity: new_humidity,
battery_level: new_battery
}
end
end
# Insert sample data in batches
puts "Generating and inserting sample data..."
ActiveRecord::Base.logger = nil # Suppress logs during bulk insert
time = 1.month.ago
10.times do
batch = generate_sample_data(total: 10_000, time: time)
Measurement.insert_all(batch, returning: false)
time = batch.last[:time] + 1.second
print "."
end
ActiveRecord::Base.logger = Logger.new(STDOUT)
# Refresh aggregates and show some example queries
puts "\nRefreshing continuous aggregates..."
Measurement.refresh_aggregates
puts "\nLast hour average temperature by device:"
pp Measurement.avg_temperature.last_hour.group(:device_id).map(&:attributes)
puts "\nCompressing old chunks..."
old_chunks = Measurement.chunks.where("range_end < ?", 7.days.ago)
old_chunks.each(&:compress!)
puts "\nHypertable size details:"
pp Measurement.hypertable.detailed_size
puts "\nCompression statistics:"
pp Measurement.hypertable.compression_stats