Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](ut) add ut for schema util #48811

Open
wants to merge 2 commits into
base: variant-sparse
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ Status _create_column_writer(uint32_t cid, const TabletColumn& column,
opt->need_bloom_filter = column.is_bf_column();
opt->need_bitmap_index = column.has_bitmap_index();
const auto& index = tablet_schema->inverted_index(column.parent_unique_id());
VLOG_DEBUG << "column: " << column.name() << " need_inverted_index: " << opt->need_inverted_index
<< " need_bloom_filter: " << opt->need_bloom_filter
<< " need_bitmap_index: " << opt->need_bitmap_index;
VLOG_DEBUG << "column: " << column.name()
<< " need_inverted_index: " << opt->need_inverted_index
<< " need_bloom_filter: " << opt->need_bloom_filter
<< " need_bitmap_index: " << opt->need_bitmap_index;

// init inverted index
if (index != nullptr &&
Expand Down Expand Up @@ -402,9 +403,9 @@ Status VariantColumnWriterImpl::_process_sparse_column(
return status;
}
VLOG_DEBUG << "dump sparse "
<< vectorized::schema_util::dump_column(
vectorized::ColumnObject::get_sparse_column_type(),
ptr->get_sparse_column());
<< vectorized::Block::dump_column(
ptr->get_sparse_column(),
vectorized::ColumnObject::get_sparse_column_type());
RETURN_IF_ERROR(
_sparse_column_writer->append(column->get_nullmap(), column->get_data(), num_rows));
++column_id;
Expand Down Expand Up @@ -497,10 +498,11 @@ bool VariantColumnWriterImpl::is_finalized() const {

Status VariantColumnWriterImpl::append_data(const uint8_t** ptr, size_t num_rows) {
DCHECK(!is_finalized());
const auto& src = *reinterpret_cast<const vectorized::ColumnObject*>(*ptr);
const auto* column = reinterpret_cast<const vectorized::VariantColumnData*>(*ptr);
const auto& src = *reinterpret_cast<const vectorized::ColumnObject*>(column->column_data);
auto* dst_ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
// TODO: if direct write we could avoid copy
dst_ptr->insert_range_from(src, 0, num_rows);
dst_ptr->insert_range_from(src, column->row_pos, num_rows);
return Status::OK();
}

Expand Down Expand Up @@ -660,8 +662,9 @@ Status VariantSubcolumnWriter::finalize() {
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());
// refresh opts and get writer with flush column
vectorized::schema_util::inherit_column_attributes(parent_column, flush_column);
VLOG_DEBUG << "parent_column: " << parent_column.name() << " flush_column: "
<< flush_column.name() << " is_bf_column: " << parent_column.is_bf_column() << " "
VLOG_DEBUG << "parent_column: " << parent_column.name()
<< " flush_column: " << flush_column.name()
<< " is_bf_column: " << parent_column.is_bf_column() << " "
<< flush_column.is_bf_column();
RETURN_IF_ERROR(_create_column_writer(
0, flush_column, _opts.rowset_ctx->tablet_schema, _opts.inverted_index_file_writer,
Expand Down
63 changes: 14 additions & 49 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
}

if (scalar_root_column->is_column_string()) {
variant_column = ColumnObject::create(var.max_subcolumns_count());
// now, subcolumns have not been set, so we set it to 0
variant_column = ColumnObject::create(0);
parse_json_to_variant(*variant_column.get(),
assert_cast<const ColumnString&>(*scalar_root_column), config);
} else {
Expand Down Expand Up @@ -573,43 +574,6 @@ vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
return sorted;
}

// ---------------------------

std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
Block tmp;
tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
return tmp.dump_data(0, tmp.rows());
}

// ---------------------------
Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst) {
auto type_string = std::make_shared<DataTypeString>();
std::string jsonpath = path.to_jsonpath();
bool is_nullable = source->is_nullable();
auto json_type = is_nullable ? make_nullable(std::make_shared<DataTypeJsonb>())
: std::make_shared<DataTypeJsonb>();
ColumnsWithTypeAndName arguments {
{source, json_type, ""},
{type_string->create_column_const(1, Field(String(jsonpath.data(), jsonpath.size()))),
type_string, ""}};
auto function =
SimpleFunctionFactory::instance().get_function("jsonb_extract", arguments, json_type);
if (!function) {
return Status::InternalError("Not found function jsonb_extract");
}
Block tmp_block {arguments};
vectorized::ColumnNumbers argnum;
argnum.emplace_back(0);
argnum.emplace_back(1);
size_t result_column = tmp_block.columns();
tmp_block.insert({nullptr, json_type, ""});
RETURN_IF_ERROR(function->execute(nullptr, tmp_block, argnum, result_column, source->size()));
dst = tmp_block.get_by_position(result_column)
.column->convert_to_full_column_if_const()
->assume_mutable();
return Status::OK();
}

bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* old_schema,
int32_t new_col_idx, int32_t old_col_idx) {
const auto& column_new = new_schema->column(new_col_idx);
Expand Down Expand Up @@ -641,8 +605,6 @@ TabletColumn create_sparse_column(const TabletColumn& variant) {
return res;
}

using PathToNoneNullValues = std::unordered_map<std::string, size_t>;

Status collect_path_stats(const RowsetSharedPtr& rs,
std::unordered_map<int32_t, PathToNoneNullValues>& uid_to_path_stats) {
SegmentCacheHandle segment_cache;
Expand Down Expand Up @@ -763,7 +725,8 @@ Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets,
subcolumn.set_name(column->name_lower_case() + "." + subpath.to_string());
subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcolumn.set_parent_unique_id(column->unique_id());
subcolumn.set_path_info(PathInData(column->name_lower_case() + "." + subpath.to_string()));
subcolumn.set_path_info(
PathInData(column->name_lower_case() + "." + subpath.to_string()));
subcolumn.set_aggregation_method(column->aggregation());
subcolumn.set_variant_max_subcolumns_count(column->variant_max_subcolumns_count());
subcolumn.set_is_nullable(true);
Expand All @@ -784,6 +747,11 @@ Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets,
// Calculate statistics about variant data paths from the encoded sparse column
void calculate_variant_stats(const IColumn& encoded_sparse_column,
segment_v2::VariantStatisticsPB* stats) {
DCHECK(stats->sparse_column_non_null_size_size() <=
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE);
int limit = VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE -
stats->sparse_column_non_null_size_size();

// Cast input column to ColumnMap type since sparse column is stored as a map
const auto& map_column = assert_cast<const ColumnMap&>(encoded_sparse_column);

Expand All @@ -805,8 +773,7 @@ void calculate_variant_stats(const IColumn& encoded_sparse_column,
}
// If path doesn't exist and we haven't hit the max statistics size limit,
// add it with count 1
else if (sparse_data_paths_statistics.size() <
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
else if (sparse_data_paths_statistics.size() < limit) {
sparse_data_paths_statistics.emplace(path, 1);
}
}
Expand All @@ -815,13 +782,11 @@ void calculate_variant_stats(const IColumn& encoded_sparse_column,
// This maps each path string to its frequency count
for (const auto& [path, size] : sparse_data_paths_statistics) {
const auto& sparse_path = path.to_string();
auto it = stats->sparse_column_non_null_size().find(sparse_path);
if (it == stats->sparse_column_non_null_size().end()) {
stats->mutable_sparse_column_non_null_size()->emplace(sparse_path, size);
auto& count_map = *stats->mutable_sparse_column_non_null_size();
if (auto it = count_map.find(sparse_path); it != count_map.end()) {
it->second += size;
} else {
size_t original_size = it->second;
stats->mutable_sparse_column_non_null_size()->emplace(sparse_path,
original_size + size);
count_map.emplace(sparse_path, size);
}
}
}
Expand Down
17 changes: 12 additions & 5 deletions be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ struct ColumnWithTypeAndName;

const std::string SPARSE_COLUMN_PATH = "__DORIS_VARIANT_SPARSE__";
namespace doris::vectorized::schema_util {

using PathToNoneNullValues = std::unordered_map<std::string, size_t>;

/// Returns number of dimensions in Array type. 0 if type is not array.
size_t get_number_of_dimensions(const IDataType& type);

Expand Down Expand Up @@ -122,17 +125,21 @@ void inherit_column_attributes(const TabletColumn& source, TabletColumn& target,
vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
const vectorized::ColumnObject::Subcolumns& subcolumns);

// Extract json data from source with path
Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst);

std::string dump_column(DataTypePtr type, const ColumnPtr& col);

bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* old_schema,
int32_t new_col_idx, int32_t old_col_idx);

// create ColumnMap<String, String>
TabletColumn create_sparse_column(const TabletColumn& variant);

// get the subpaths and sparse paths for the variant column
void get_subpaths(const TabletColumn& variant,
const std::unordered_map<int32_t, PathToNoneNullValues>& path_stats,
std::unordered_map<int32_t, TabletSchema::PathsSetInfo>& uid_to_paths_set_info);

// collect path stats from the rowset
Status collect_path_stats(const RowsetSharedPtr& rs,
std::unordered_map<int32_t, PathToNoneNullValues>& uid_to_path_stats);

// Build the temporary schema for compaction, this will reduce the memory usage of compacting variant columns
Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets, TabletSchemaSPtr& target);

Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/olap/olap_data_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1186,16 +1186,18 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap()
}
// Do nothing, the column writer will finally do finalize and write subcolumns one by one
// since we are not sure the final column(type and columns) until the end of the last block
// need to return the position of the column data
_variant_column_data = std::make_unique<VariantColumnData>(_value_ptr, _row_pos);
return Status::OK();
}

const void* OlapBlockDataConvertor::OlapColumnDataConvertorVariant::get_data() const {
if (!_value_ptr) {
return _root_data_convertor->get_data();
}
// return the ptr of original column, see VariantColumnWriterImpl::append_data
// which will cast to ColumnObject
return _value_ptr;
// return the ptr of VariantColumnData, see VariantColumnWriterImpl::append_data
// which will cast to VariantColumnData
return _variant_column_data.get();
}
const void* OlapBlockDataConvertor::OlapColumnDataConvertorVariant::get_data_at(
size_t offset) const {
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/olap/olap_data_convertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class IOlapColumnDataAccessor {
virtual ~IOlapColumnDataAccessor() = default;
};

struct VariantColumnData {
const void* column_data;
size_t row_pos;
};

class OlapBlockDataConvertor {
public:
OlapBlockDataConvertor() = default;
Expand Down Expand Up @@ -533,6 +538,7 @@ class OlapBlockDataConvertor {
private:
const void* _value_ptr;
std::unique_ptr<OlapColumnDataConvertorVarChar> _root_data_convertor;
std::unique_ptr<VariantColumnData> _variant_column_data;
};

private:
Expand Down
109 changes: 0 additions & 109 deletions be/test/common/schema_util_test.cpp

This file was deleted.

Loading