Skip to content

Commit

Permalink
[fix](migrate) Copy binlog files (#41083)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Sep 21, 2024
1 parent b2a21e6 commit ebabec1
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 4 deletions.
63 changes: 63 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,69 @@ Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU
return status;
}

Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
Version version, RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
std::string_view key, std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value);
if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable.
// All binlog meta has been scanned
return false;
}

if (!starts_with_binlog_meta(key)) {
auto err_msg = fmt::format("invalid binlog meta key:{}", key);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}

BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}

const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(std::string {key});
binlog_meta_pb->set_meta(std::string {value});

auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.ok()) {
LOG(WARNING) << status.to_string();
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);

return false;
};

Status iterStatus =
meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format(
"fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key,
version.to_string(), iterStatus.to_string());
return iterStatus;
}
return status;
}

Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb) {
Status status;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class RowsetMetaManager {
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
// get all binlog metas of a tablet in version.
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
Version version, RowsetBinlogMetasPB* metas_pb);
static Status remove_binlog(OlapMeta* meta, const std::string& suffix);
static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb);
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2512,6 +2512,11 @@ Status Tablet::get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versio
binlog_versions, metas_pb);
}

Status Tablet::get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb) {
return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), tablet_uid(),
binlog_versions, metas_pb);
}

std::string Tablet::get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const {
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class Tablet final : public BaseTablet {
std::string_view rowset_id) const;
Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
Status get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb);
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
if (binlog_meta_filesize > 0) {
contain_binlog = true;
RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb));
VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file;
}
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file));
}
Expand Down
102 changes: 99 additions & 3 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/pb_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -262,9 +263,11 @@ Status EngineStorageMigrationTask::_migrate() {
}

std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
RowsetBinlogMetasPB rowset_binlog_metas_pb;
do {
// migrate all index and data files but header file
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
&rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
Expand Down Expand Up @@ -292,7 +295,8 @@ Status EngineStorageMigrationTask::_migrate() {
// we take the lock to complete it to avoid long-term competition with other tasks
if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) {
// force to copy the remaining data and index
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
&rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
Expand All @@ -307,6 +311,16 @@ Status EngineStorageMigrationTask::_migrate() {
}
}

// save rowset binlog metas
if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) {
auto rowset_binlog_metas_pb_filename =
fmt::format("{}/rowset_binlog_metas.pb", full_path);
res = write_pb(rowset_binlog_metas_pb_filename, rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
}

// generate new tablet meta and write to hdr file
res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets, end_version);
if (!res.ok()) {
Expand Down Expand Up @@ -350,10 +364,92 @@ void EngineStorageMigrationTask::_generate_new_header(
}

Status EngineStorageMigrationTask::_copy_index_and_data_files(
const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets) const {
const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets,
RowsetBinlogMetasPB* all_binlog_metas_pb) const {
RowsetBinlogMetasPB rowset_binlog_metas_pb;
for (const auto& rs : consistent_rowsets) {
RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id()));

Version binlog_versions = rs->version();
RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, &rowset_binlog_metas_pb));
}

// copy index binlog files.
for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) {
auto num_segments = rowset_binlog_meta.num_segments();
std::string_view rowset_id = rowset_binlog_meta.rowset_id();

RowsetMetaPB rowset_meta_pb;
if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
auto err_msg = fmt::format("fail to parse binlog meta data value:{}",
rowset_binlog_meta.data());
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}
const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
TabletSchema tablet_schema;
tablet_schema.init_from_pb(tablet_schema_pb);

// copy segment files and index files
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
std::string segment_file_path = _tablet->get_segment_filepath(rowset_id, segment_index);
auto snapshot_segment_file_path =
fmt::format("{}/{}_{}.binlog", full_path, rowset_id, segment_index);

Status status = io::global_local_filesystem()->copy_path(segment_file_path,
snapshot_segment_file_path);
if (!status.ok()) {
LOG(WARNING) << "fail to copy binlog segment file. [src=" << segment_file_path
<< ", dest=" << snapshot_segment_file_path << "]" << status;
return status;
}
VLOG_DEBUG << "copy " << segment_file_path << " to " << snapshot_segment_file_path;

if (tablet_schema.get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : tablet_schema.indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}
auto index_id = index.index_id();
auto index_file =
_tablet->get_segment_index_filepath(rowset_id, segment_index, index_id);
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id,
segment_index, index_id);
VLOG_DEBUG << "copy " << index_file << " to "
<< snapshot_segment_index_file_path;
status = io::global_local_filesystem()->copy_path(
index_file, snapshot_segment_index_file_path);
if (!status.ok()) {
LOG(WARNING)
<< "fail to copy binlog index file. [src=" << index_file
<< ", dest=" << snapshot_segment_index_file_path << "]" << status;
return status;
}
}
} else if (tablet_schema.has_inverted_index()) {
auto index_file = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}.binlog-index", full_path, rowset_id, segment_index);
VLOG_DEBUG << "copy " << index_file << " to " << snapshot_segment_index_file_path;
status = io::global_local_filesystem()->copy_path(index_file,
snapshot_segment_index_file_path);
if (!status.ok()) {
LOG(WARNING) << "fail to copy binlog index file. [src=" << index_file
<< ", dest=" << snapshot_segment_index_file_path << "]" << status;
return status;
}
}
}
}

std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(),
rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(),
google::protobuf::RepeatedFieldBackInserter(
all_binlog_metas_pb->mutable_rowset_binlog_metas()));

return Status::OK();
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/task/engine_storage_migration_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/olap_file.pb.h>

#include <mutex>
#include <shared_mutex>
#include <string>
Expand Down Expand Up @@ -69,7 +71,8 @@ class EngineStorageMigrationTask final : public EngineTask {
// TODO: hkp
// rewrite this function
Status _copy_index_and_data_files(const std::string& full_path,
const std::vector<RowsetSharedPtr>& consistent_rowsets) const;
const std::vector<RowsetSharedPtr>& consistent_rowsets,
RowsetBinlogMetasPB* all_binlog_metas_pb) const;

private:
StorageEngine& _engine;
Expand Down

0 comments on commit ebabec1

Please sign in to comment.