Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exponential histogram #9003

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pgxn/neon/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl

EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql neon--1.4--1.5.sql neon--1.5--1.4.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"

EXTRA_CLEAN = \
Expand Down
22 changes: 20 additions & 2 deletions pgxn/neon/file_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration, 1.0);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
Expand All @@ -1280,11 +1280,29 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
int32 dc;
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1, 1.0);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
}

PG_FUNCTION_INFO_V1(approximate_optimal_cache_size);

Datum
approximate_optimal_cache_size(PG_FUNCTION_ARGS)
{
if (lfc_size_limit != 0)
{
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
double min_hit_ratio = PG_ARGISNULL(1) ? 1.0 : PG_GETARG_FLOAT8(1);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration, min_hit_ratio);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
}
76 changes: 66 additions & 10 deletions pgxn/neon/hll.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
*
* Implements https://hal.science/hal-00465313/document
*
*
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
* suited to estimating the cardinality of very large sets; in particular, we
* have not attempted to further optimize the implementation as described in
Expand Down Expand Up @@ -126,22 +126,78 @@ addSHLL(HyperLogLogState *cState, uint32 hash)
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);

cState->regs[index][count] = now;
if (cState->regs[index][count].ts)
{
/* update histgoram */
int64_t delta = (now - cState->regs[index][count].ts)/USECS_PER_SEC;
uint32_t new_histogram[HIST_SIZE] = {0};
for (int i = 0; i < HIST_SIZE; i++) {
/* Use middle point of interval */
uint32 interval_log2 = pg_ceil_log2_32((delta + (HIST_MIN_INTERVAL*((1<<i) + ((1<<i)/2))/2)) / HIST_MIN_INTERVAL);
uint32 cell = Min(interval_log2, HIST_SIZE-1);
new_histogram[cell] += cState->regs[index][count].histogram[i];
}
memcpy(cState->regs[index][count].histogram, new_histogram, sizeof new_histogram);
}
cState->regs[index][count].ts = now;
cState->regs[index][count].histogram[0] += 1; // most recent access always goes to first histogram backet
}

static uint32_t
getAccessCount(const HyperLogLogRegister* reg, time_t duration)
{
uint32_t count = 0;
// Simplest solution is to take in account all points fro overlapped interval
// for (size_t i = 0; i < HIST_SIZE && HIST_MIN_INTERVAL*((1 << i)/2) <= duration; i++) {
for (size_t i = 0; i < HIST_SIZE; i++) {
uint32_t high_boundary = HIST_MIN_INTERVAL*(1 << i);
uint32_t low_boundary = HIST_MIN_INTERVAL*((1 << i)/2);
if (high_boundary >= duration) {
// Assume uniform distribution of points within interval and use proportional number of points
Assert(duration >= low_boundary);
count += reg->histogram[i] * (duration - low_boundary) / (high_boundary - low_boundary);
break; // it's last interval within specified time range
} else {
count += reg->histogram[i];
}
}
return count;
}

static uint8
getMaximum(const TimestampTz* reg, TimestampTz since)
getMaximum(const HyperLogLogRegister* reg, TimestampTz since, time_t duration, double min_hit_ratio)
{
uint8 max = 0;

for (size_t i = 0; i < HLL_C_BITS + 1; i++)
size_t i, j;
if (min_hit_ratio == 1.0)
{
if (reg[i] >= since)
for (i = 0; i < HLL_C_BITS + 1; i++)
{
max = i;
if (reg[i].ts >= since)
{
max = i;
}
}
}
else
{
uint32_t total_count = 0;
for (i = 0; i < HLL_C_BITS + 1; i++)
{
total_count += getAccessCount(&reg[i], duration);
}
if (total_count != 0)
{
for (i = 0; i < HLL_C_BITS + 1; i++)
{
// Take in account only bits with access frequncy exceeding maximal miss rate (1 - hit rate)
if (reg[i].ts >= since && 1.0 - (double)getAccessCount(&reg[i], duration) / total_count <= min_hit_ratio)
{
max = i;
}
}
}
}

return max;
}

Expand All @@ -150,7 +206,7 @@ getMaximum(const TimestampTz* reg, TimestampTz since)
* Estimates cardinality, based on elements added so far
*/
double
estimateSHLL(HyperLogLogState *cState, time_t duration)
estimateSHLL(HyperLogLogState *cState, time_t duration, double min_hit_ratio)
{
double result;
double sum = 0.0;
Expand All @@ -161,7 +217,7 @@ estimateSHLL(HyperLogLogState *cState, time_t duration)

for (i = 0; i < HLL_N_REGISTERS; i++)
{
R[i] = getMaximum(cState->regs[i], since);
R[i] = getMaximum(cState->regs[i], since, duration, min_hit_ratio);
sum += 1.0 / pow(2.0, R[i]);
}

Expand Down
23 changes: 17 additions & 6 deletions pgxn/neon/hll.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@
#define HLL_C_BITS (32 - HLL_BIT_WIDTH)
#define HLL_N_REGISTERS (1 << HLL_BIT_WIDTH)

/*
* Number of histogram cells. We use exponential histogram with first interval
* equals to one minutes. Autoscaler request LFC statistic with intervals 1,2,...,60 minutes
* so 2^8=64 seems to be enough for our needs.
*/
#define HIST_SIZE 8
#define HIST_MIN_INTERVAL 60 /* seconds */

/*
* HyperLogLog is an approximate technique for computing the number of distinct
* entries in a set. Importantly, it does this by using a fixed amount of
Expand All @@ -69,18 +77,21 @@
* modified timestamp >= the query timestamp. This value is the number of bits
* for this register in the normal HLL calculation.
*
* The memory usage is 2^B * (C + 1) * sizeof(TimetampTz), or 184kiB.
* Usage could be halved if we decide to reduce the required time dimension
* precision; as 32 bits in second precision should be enough for statistics.
* However, that is not yet implemented.
* The memory usage is 2^B * (C + 1) * sizeof(HyperLogLogRegister), or 920kiB.
*/
typedef struct
{
TimestampTz ts; /* last access timestamp */
uint32_t histogram[HIST_SIZE]; /* access counter exponential histogram */
} HyperLogLogRegister;

typedef struct HyperLogLogState
{
TimestampTz regs[HLL_N_REGISTERS][HLL_C_BITS + 1];
HyperLogLogRegister regs[HLL_N_REGISTERS][HLL_C_BITS + 1];
} HyperLogLogState;

extern void initSHLL(HyperLogLogState *cState);
extern void addSHLL(HyperLogLogState *cState, uint32 hash);
extern double estimateSHLL(HyperLogLogState *cState, time_t dutration);
extern double estimateSHLL(HyperLogLogState *cState, time_t dutration, double min_hit_ratio);

#endif
43 changes: 43 additions & 0 deletions test_runner/regress/test_lfc_working_set_approximation.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,46 @@ def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):

assert estimation_1k >= 20 and estimation_1k <= 40
assert estimation_10k >= 200 and estimation_10k <= 400


def test_optimal_cache_size_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env

endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.5'")
cur.execute(
"create table t_huge(pk integer primary key, count integer default 0, payload text default repeat('?', 128))"
)
cur.execute(
"create table t_small(pk integer primary key, count integer default 0, payload text default repeat('?', 128))"
)
cur.execute(
"insert into t_huge(pk) values (generate_series(1,1000000))"
) # table size is 21277 pages
cur.execute(
"insert into t_small(pk) values (generate_series(1,100000))"
) # table size is 2128 pages
time.sleep(2)
before = time.monotonic()
for _ in range(100):
cur.execute("select sum(count) from t_small")
cur.execute("select sum(count) from t_huge")
after = time.monotonic()
cur.execute(f"select approximate_working_set_size_seconds({int(after - before + 1)})")
ws_estimation = cur.fetchall()[0][0]
log.info(f"Working set size estimaton {ws_estimation}")
cur.execute(f"select approximate_optimal_cache_size({int(after - before + 1)}, 0.99)")
optimal_cache_size = cur.fetchall()[0][0]
log.info(f"Optimal cache size for 99% hit rate {optimal_cache_size}")
assert ws_estimation >= 20000 and ws_estimation <= 30000
assert optimal_cache_size >= 2000 and optimal_cache_size <= 3000
Loading