Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
actual retry number not matching exchange.max-retry-count.

restore logic of getRandomString for external function register

Signed-off-by: chenpingzeng <[email protected]>

[I4UMZH] Kryo Serialization Integration For Snapshot

Adding Secondary Spilling For OrderByOperator.

doc changes for RELOAD CUBE and SHOW CREATE CUBE

updated the ui document to show execution timeout property with its default value

handle exclusion of incomplete spill files for snapshots

Document update for enabling asynchronous spill mechanism for order by.

Changes to record snapshot capture metrics

updated ui execution timeout to 100 days, also updated document

SpilledJoinOptimizations, spiller blooms for eliminating spill probe
Added support for right outer scan when Build side spills.

Updating snapshot documentation with statastics details

Adding Configuration To Support Spilling In HDFS.

Corrected spelling mistakes in snapshot doc

Add varchar predicate limitation in cube documentation

Removing nodeId for creating HDFS spill subdirectories.
  • Loading branch information
ahanapradhan committed Mar 7, 2022
1 parent 460d15c commit 3c08153
Show file tree
Hide file tree
Showing 104 changed files with 3,815 additions and 579 deletions.
64 changes: 62 additions & 2 deletions hetu-docs/en/admin/properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,30 @@ This section describes the most important config properties that may be used to
>
> This config property can be overridden by the `spill_window_operator` session property.

### `experimental.spill-build-for-outer-join-enabled`

> - **Type:** `boolean`
> - **Default value:** `false`
>
> Enables spill feature for right-outer and full-outer join operations.
>
>
>
> This config property can be overridden by the `spill_build_for_outer_join_enabled` session property.
### `experimental.inner-join-spill-filter-enabled`

> - **Type:** `boolean`
> - **Default value:** `false`
>
> Enables bloom filter based build-side spill matching for probe side spill decision.
>
>
>
> This config property can be overridden by the `inner_join_spill_filter_enabled` session property.

### `experimental.spill-reuse-tablescan`

> - **Type:** `boolean`
Expand All @@ -179,7 +203,7 @@ This section describes the most important config properties that may be used to
>
> Directory where spilled content will be written. It can be a comma separated list to spill simultaneously to multiple directories, which helps to utilize multiple drives installed in the system.
>
>
> When `experimental.spiller-spill-to-hdfs` is to `true`, `experimental.spiller-spill-path` must contain only a single directory.
>
> It is not recommended to spill to system drives. Most importantly, do not spill to the drive on which the JVM logs are written, as disk overutilization might cause JVM to pause for lengthy periods, causing queries to fail.
Expand Down Expand Up @@ -268,7 +292,7 @@ This section describes the most important config properties that may be used to
> - **Type:** `data size`
> - **Default value:** `512 MB`
>
> Sets memory selection threshold for revocable memory of operator to directly allocate revocable memory for remaining bytes ready to revoke.
> Sets memory selection threshold for revocable memory of operator to directly allocate revocable memory for remaining bytes ready to revoke.
### `experimental.prioritize-larger-spilts-memory-revoke`

Expand All @@ -277,6 +301,34 @@ This section describes the most important config properties that may be used to
>
> Enables to prioritize splits with larger revocable memory.
### `experimental.spill-non-blocking-orderby`

> - **Type:** `boolean`
> - **Default value:** `false`
>
> Enables order by operator to use asynchronous mechanism to spill, i.e it can accumulate input even when a spill is in progress and initiate a secondary spill when the secondary data accumulate exceeds a threshold or when the primary spill is completed, the default value of the threshold is the minimum between 20MB and 5% of available free memory. This property must be used in conjunction with the `experimental.spill-enabled` property.
>
>
>
> This config property can be overridden by the `spill_non_blocking_orderby` session property.
### `experimental.spiller-spill-to-hdfs`

> - **Type:** `boolean`
> - **Default value:** `false`
>
> Enables spilling into HDFS. When this property is set to `true` the property `experimental.spiller-spill-profile` must be set and also `experimental.spiller-spill-path` must contain only a single path.
### `experimental.spiller-spill-profile`

> - **Type:** `string`
> - **No default value.** Must be set when spilling to hdfs is enabled
>
>
> This property defines the [filesystem](../develop/filesystem.md) profile used to spill. The corresponding profile must exist in `etc/filesystem`. For example, if this property is set as `experimental.spiller-spill-profile=spill-hdfs`, a profile describing this filesystem `spill-hdfs.properties` must be created in `etc/filesystem` with necessary information including authentication type, config, and keytabs (if applicable, refer [filesystem](../develop/filesystem.md) for details).
>
> This property is required when `experimental.spiller-spill-to-hdfs` is set to `true`. It must be included in configuration files for all coordinators and all workers. The specified file system must be accessible by all workers, and they must be able to read from and write to the path declared in `experimental.spiller-spill-path` folder in the specified file system.
## Exchange Properties

Exchanges transfer data between openLooKeng nodes for different stages of a query. Adjusting these properties may help to resolve inter-node communication issues or improve network utilization.
Expand Down Expand Up @@ -837,6 +889,14 @@ helps with cache affinity scheduling.
>
> This can also be specified on a per-query basis using the `snapshot_retry_timeout` session property.
### `hetu.snapshot.useKryoSerialization`
> - **Type:** `boolean`
> - **Default value:** `false`
>
> Enables Kryo based serialization for snapshot, instead of default java serializer.
## HTTP Client Configurations
### `http.client.idle-timeout`
Expand Down
10 changes: 10 additions & 0 deletions hetu-docs/en/admin/reliable-execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ The ability to recover from an error and resume from a snapshot does not come fo

It is suggested to only turn on distributed snapshot when necessary, i.e. for queries that run for a long time. For these types of workloads, the overhead of taking snapshots becomes negligible.

## Snapshot statistics

Snapshot capture and restore statistics are displayed in CLI along with query result when CLI is launched in debug mode

Snapshot capture statistics covers size of snapshots captured, CPU Time taken for capturing the snapshots and Wall Time taken for capturing the snapshots during the query. These statistics are displayed for all snapshots and for last snapshot separately.

Snapshot restore statistics covers number of times restored from snapshots during query, Size of the snapshots loaded for restoring, CPU Time taken for restoring from snapshots and Wall Time taken for restoring from snapshots. Restore statistics are displayed only when there is restore(recovery) happened during the query.

![](../images/snapshot_statistics.png)

## Configurations

Configurations related to distributed snapshot feature can be found in [Properties Reference](properties.md#distributed-snapshot).
5 changes: 5 additions & 0 deletions hetu-docs/en/admin/spill.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ saturation of the configured spill paths.

openLooKeng treats spill paths as independent disks (see [JBOD](https://en.wikipedia.org/wiki/Non-RAID_drive_architectures#JBOD)), so there is no need to use RAID for spill.


## Spill To HDFS
Spilling directly into HDFS is also possible for that `experimental.spiller-spill-to-hdfs` needs to be set to `true`, `experimental.spiller-spill-profile` needs to be set and `spiller-spill-path` must contain only a single directory when we intend to spill into HDFS. (refer `experimental.spiller-spill-to-hdfs` and `experimental.spiller-spill-profile` properties for more details )

## Spill Compression


Expand Down Expand Up @@ -69,6 +73,7 @@ is enabled, if there is not enough memory, intermediate accumulated aggregation
### Order By

If you're trying to sort a larger amount of data, a significant amount of memory may be needed. When spill to disk for order by is enabled, if there is not enough memory, intermediate sorted results are written to disk. They are loaded back and merged with a lower memory footprint.
Generally when a spill is in progress the operator is blocked from taking inputs, but when `experimental.spill-non-blocking-orderby` is set to `true` order by uses asynchronous mechanism to spill (see`experimental.spill-non-blocking-orderby`).

### Window functions

Expand Down
28 changes: 28 additions & 0 deletions hetu-docs/en/admin/web-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,31 @@ and statistics about the query is available by clicking the *JSON* link. These v
> - **Default value:** `false`
>
> Insecure authentication over HTTP is disabled by default. This could be overridden via "hetu.queryeditor-ui.allow-insecure-over-http" property of "etc/config.properties" (e.g. hetu.queryeditor-ui.allow-insecure-over-http=true).
### `hetu.queryeditor-ui.execution-timeout`

> - **Type:** `duration`
> - **Default value:** `100 DAYS`>
>
> UI Execution timeout is set to 100 days as default. This could be overridden via "hetu.queryeditor-ui.execution-timeout" of "etc/config.properties"
### `hetu.queryeditor-ui.max-result-count`

> - **Type:** `int`
> - **Default value:** `1000`
>
> UI max result count is set to 1000 as default. This could be overridden via "hetu.queryeditor-ui.max-result-count" of "etc/config.properties"
### `hetu.queryeditor-ui.max-result-size-mb`

>- **Type:** `size`
>- **Default value:** `1GB`
>
> UI max result size is set to 1 GB as default. This could be overridden via "hetu.queryeditor-ui.max-result-size-mb" of "etc/config.properties"
### `hetu.queryeditor-ui.session-timeout`

> - **Type:** `duration`
> - **Default value:** `1 DAYS`
>
> UI session timeout is set to 1 day as default. This could be overridden via "hetu.queryeditor-ui.session-timeout" of "etc/config.properties"
Binary file added hetu-docs/en/images/snapshot_statistics.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
61 changes: 44 additions & 17 deletions hetu-docs/en/preagg/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ Tables from following Connectors can be used as source to build a StarTree Cube.

2.1. Overcome the limitation of Creating Cube for larger dataset.

2.2. Update Cube if source table has been updated.

## Enabling and Disabling StarTree Cube
To enable:
```sql
Expand Down Expand Up @@ -122,6 +120,17 @@ SELECT nationkey, avg(nationkey), max(regionkey) FROM nation WHERE nationkey >=
Since the data inserted into the Cube was for `nationkey >= 5`, only queries matching this condition will utilize the Cube.
Queries not matching the condition would continue to work but won't use the Cube.

If the source table of a Cube gets updated, the corresponding Cube gets expired automatically. In order to overcome
this issue, we have added support in OpenLooKeng CLI by introducing **RELOAD CUBE** command. The user will have the
ability to manually reload a cube if the status of the Cube becomes INACTIVE or EXPIRED. The syntax to reload the
Cube nation_cube is as follows,

```sql
RELOAD CUBE nation_cube
```
Please note that this feature is only supported via the CLI. During this reload process if an unexpected error occurs, the user will get to see the original SQL statement
to recreate the cube manually.

## Building Cube for Large Dataset
One of the limitations with the current implementation is that Cube cannot be built for a larger dataset at once. This is due to the cluster memory limitation.
Processing large number of rows requires more memory than cluster is configured with. This results in query failing with message **Query exceeded per-node user memory
Expand Down Expand Up @@ -180,38 +189,55 @@ SHOW CUBES;
```

**Note:**
1. The system will try to rewrite all type of Predicates into a Range to see if they can be merged together.
1. The system will try to rewrite all type of Predicates into a Range to see if they can be merged together.
All continuous predicates will be merged into a single range predicate and remaining predicates are untouched.

Only the following types are supported and can be merged together.
`Integer, TinyInt, SmallInt, BigInt, Date`
For other data types, it is difficult to identify if two predicates are continuous therefore they cannot be merged together. And because of this issue, there is
possibility that particular Cube may not be used during query optimization even if the Cube has all the required data. For example,
Only the following types are supported and can be merged together.
`Integer, TinyInt, SmallInt, BigInt, Date, String`

For String data type, predicate merge logic functionally works only if the Strings are ending with a digit and all are of same length.
For example,

```sql
INSERT INTO CUBE store_sales_cube WHERE store_id BETWEEN 'A01' AND 'A10';
INSERT INTO CUBE store_sales_cube WHERE store_id BETWEEN 'A11' AND 'A20';
```
Here these two predicates cannot be merged into store_id BETWEEN 'A01' AND 'A20'; So the Cube won't be used
for queries that are spanning over two the predicates;
After the insertion, the two predicates will be merged into `'A01' AND 'A20'`

```sql
SELECT ss_store_id, sum(ss_sales_price) WHERE ss_store_id BETWEEN 'A05' AND 'A15'; - Cube would be used for this query.
```

Consider the following example where `store_id` values are not of same length.

```sql
SELECT ss_store_id, sum(ss_sales_price) WHERE ss_store_id BETWEEN 'A05' AND 'A15'; - Cube won't be used for optimizing this query. This is a limitation as of now.
INSERT INTO CUBE store_sales_cube WHERE store_id = 'A1';
INSERT INTO CUBE store_sales_cube WHERE store_id = 'A2'
```
store_id predicate will be rewritten as `store_id >= 'A1' and store < 'A3'` as per the varchar predicate merge logic;

```sql
INSERT INTO CUBE store_sales_cube WHERE store_id = 'A10'
```
Because of the predicate rewrite some of the following queries can't be supported
The above query would fail because `A10` is subset of the range `store_id >= 'A1' and store < 'A3'`. So Users should be wary of this issue.

For other data types, it is difficult to identify if two predicates are continuous therefore they cannot be merged together. And because of this issue, there is
possibility that particular Cube may not be used during query optimization even if the Cube has all the required data.

2. Predicate rewrite has some limitations as well. Consider the following query

```sql
INSERT INTO CUBE store_sales_cube WHERE ss_sold_date_sk > 2451911;
```
The predicate is rewritten as ss_sold_date_sk >= 2451912 to be prepare for merging continous predicates.
Since the predicate is rewritten, they query using ss_sold_date_sk > 2451911 predicate will not match with Cube predicate so Cube won't be used to
optimize the query. The same is applicable for predicates with <= operator. ie. ss_sold_date_sk <= 2451911 is rewritten as ss_sold_date_sk < 2451912
The predicate is rewritten as ss_sold_date_sk >= 2451912 to support merging continuous predicates.
Since the predicate is rewritten, they query using ss_sold_date_sk > 2451911 predicate will not match with Cube predicate so Cube won't be used to
optimize the query. The same is applicable for predicates with <= operator. ie. ss_sold_date_sk <= 2451911 is rewritten as ss_sold_date_sk < 2451912

```sql
SELECT ss_sold_date_sk, .... FROM hive.tpcds_sf1.store_sales WHERE ss_sold_date_sk > 2451911
```
3. Only single column predicates can be merged.
3. Only single column predicates can be merged.


## Open issues and Limitations
1. StarTree Cube is only effective when the group by cardinality is considerably fewer than the number of rows in source table.
Expand All @@ -223,7 +249,8 @@ SHOW CUBES;
5. OpenLooKeng CLI has been modified to ease the process of creating Cubes for larger datasets. But still there are limitations with this implementation
as the process involves merging multiple Cube predicates into one. Only Cube predicates defined on Integer, Long and Date types can be merged properly. Support for Char,
String types still need to be implemented.
6. Varchar predicates can be merged only if the values are of same length.

## Performance Optimizations on Star Tree
1. Star Tree Query re-write optimization for same group by columns: If the group by columns of the cube and query matches, the query is
re-written internally to select the pre-aggregated data. If the group by columns does not matches, the additional aggregations are
Expand Down
18 changes: 18 additions & 0 deletions hetu-docs/en/preagg/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,25 @@ Show Cubes for `orders` table:
```sql
SHOW CUBES FOR orders;
```
## RELOAD CUBE

### Synopsis

``` sql
RELOAD CUBE cube_name
```

### Description
Reloads the Cube if the source table has been updated.

### Examples

If the source table `orders` of the cube `orders_cube` gets updated then the status of the cube `orders_cube`
gets EXPIRED. Use the command `RELOAD CUBE cube_name` to overcome this issue as follows:

```sql
RELOAD CUBE orders_cube
```
## DROP CUBE

### Synopsis
Expand Down
32 changes: 32 additions & 0 deletions hetu-docs/en/sql/show-create-cube.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

SHOW CREATE CUBE
=================

Synopsis
--------

``` sql
SHOW CREATE CUBE cube_name
```

Description
-----------

Show the SQL statement that creates the specified cube.

Examples
--------

Create a cube `orders_cube` on `orders` table as follows

CREATE CUBE orders_cube ON orders WITH (AGGREGATIONS = (avg(totalprice), sum(totalprice), count(*)),
GROUP = (custKEY, ORDERkey), format= 'orc')

Use `SHOW CREATE CUBE` command to show the SQL statement that was used to create the cube `orders_cube`:

SHOW CREATE CUBE orders_cube;

``` sql
CREATE CUBE orders_cube ON orders WITH (AGGREGATIONS = (avg(totalprice), sum(totalprice), count(*)),
GROUP = (custKEY, ORDERkey), format= 'orc')
```
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
import java.util.stream.Stream;

import static java.nio.file.StandardOpenOption.CREATE_NEW;
Expand Down Expand Up @@ -273,6 +274,40 @@ public void close()
getHdfs().close();
}

@Override
public long getUsableSpace(Path path) throws IOException
{
return getHdfs().getStatus(toHdfsPath(path)).getRemaining();
}

@Override
public long getTotalSpace(Path path) throws IOException
{
return getHdfs().getStatus(toHdfsPath(path)).getCapacity();
}

@Override
public Path createTemporaryFile(Path path, String prefix, String suffix) throws IOException
{
String randomNo = UUID.randomUUID().toString();
Path finalPath = Paths.get(String.valueOf(path), prefix + randomNo + suffix);
unwrapHdfsExceptions(() -> getHdfs().create(toHdfsPath(finalPath)));
return finalPath;
}

@Override
public Path createFile(Path path) throws IOException
{
unwrapHdfsExceptions(() -> getHdfs().create(toHdfsPath(path)));
return path;
}

@Override
public Stream<Path> getDirectoryStream(Path path, String prefix, String suffix) throws IOException
{
return list(path).filter(pth -> pth.getFileName().toString().startsWith(prefix) && pth.getFileName().toString().endsWith(suffix));
}

/**
* Getter for filesystem object (lazy instantiation)
*
Expand Down
Loading

0 comments on commit 3c08153

Please sign in to comment.