Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) {

Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& /* block_lock */) {
DCHECK(_download_state != State::DOWNLOADED);
DCHECK_NE(_downloaded_size, 0);
if (_downloaded_size == 0) {
_download_state = State::EMPTY;
_downloader_id = 0;
return Status::InternalError("Try to set empty block {} as downloaded",
_block_range.to_string());
}
Status status = _mgr->_storage->finalize(_key, this->_block_range.size());
if (status.ok()) [[likely]] {
_download_state = State::DOWNLOADED;
Expand Down Expand Up @@ -147,7 +152,15 @@ Status FileBlock::append(Slice data) {
}

Status FileBlock::finalize() {
if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) {
if (_downloaded_size == 0) {
std::lock_guard block_lock(_mutex);
_download_state = State::EMPTY;
_downloader_id = 0;
_cv.notify_all();
return Status::InternalError("Try to finalize an empty file block {}",
_block_range.to_string());
}
if (_downloaded_size != _block_range.size()) {
SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
size_t old_size = _block_range.size();
_block_range.right = _block_range.left + _downloaded_size - 1;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class FileBlock {
friend class BlockFileCache;
friend class CachedRemoteFileReader;
friend struct FileBlockCell;
friend class FileBlockTestAccessor;

public:
enum class State {
Expand Down
131 changes: 131 additions & 0 deletions be/test/io/cache/block_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@

namespace doris::io {

class FileBlockTestAccessor {
public:
static void set_state(FileBlock& block, FileBlock::State state) {
block._download_state = state;
}
static void set_downloader_id(FileBlock& block, uint64_t id) { block._downloader_id = id; }
static void set_downloaded_size(FileBlock& block, size_t size) {
block._downloaded_size = size;
}

static Status call_set_downloaded(FileBlock& block) {
std::lock_guard<std::mutex> lock(block._mutex);
return block.set_downloaded(lock);
}
};

fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";
std::string tmp_file = caches_dir / "tmp_file";
Expand Down Expand Up @@ -7671,4 +7687,119 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) {
FileCacheFactory::instance()->_capacity = 0;
}

TEST_F(BlockFileCacheTest, finalize_empty_block) {
std::string my_cache_path = caches_dir / "empty_block_test" / "";
if (fs::exists(my_cache_path)) {
fs::remove_all(my_cache_path);
}
io::FileCacheSettings settings;
settings.capacity = 100;
settings.max_file_block_size = 100;
io::BlockFileCache mgr(my_cache_path, settings);
ASSERT_TRUE(mgr.initialize().ok());

for (int i = 0; i < 100; i++) {
if (mgr.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

auto key = io::BlockFileCache::hash("empty_block_test");
io::CacheContext context;
ReadStatistics rstats;
context.stats = &rstats;
context.cache_type = io::FileCacheType::NORMAL;

{
auto holder = mgr.get_or_set(key, 0, 10, context);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);
auto block = blocks[0];
ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY);

ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id());
ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADING);

// Call finalize without calling append()
Status st = block->finalize();
ASSERT_FALSE(st.ok());
ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY);
ASSERT_EQ(block->get_downloader(), 0);
}
if (fs::exists(my_cache_path)) {
fs::remove_all(my_cache_path);
}
}

TEST_F(BlockFileCacheTest, finalize_partial_block) {
std::string my_cache_path = caches_dir / "partial_block_test" / "";
if (fs::exists(my_cache_path)) {
fs::remove_all(my_cache_path);
}
io::FileCacheSettings settings;
settings.capacity = 100;
settings.max_file_block_size = 100;
io::BlockFileCache mgr(my_cache_path, settings);
ASSERT_TRUE(mgr.initialize().ok());

for (int i = 0; i < 100; i++) {
if (mgr.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

auto key = io::BlockFileCache::hash("partial_block_test");
io::CacheContext context;
ReadStatistics rstats;
context.stats = &rstats;
context.cache_type = io::FileCacheType::NORMAL;

{
auto holder = mgr.get_or_set(key, 0, 10, context);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);
auto block = blocks[0];
ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id());

std::string data(5, '0');
ASSERT_TRUE(block->append(Slice(data.data(), data.size())).ok());

// Finalize a block that only has 5 bytes out of 10
Status st = block->finalize();
ASSERT_TRUE(st.ok());
ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADED);
ASSERT_EQ(block->range().size(), 5);
ASSERT_EQ(block->range().right, 4);
}

// Verify it was shrunk in the cache
ASSERT_EQ(mgr.get_used_cache_size(io::FileCacheType::NORMAL), 5);

if (fs::exists(my_cache_path)) {
fs::remove_all(my_cache_path);
}
}

TEST_F(BlockFileCacheTest, set_downloaded_empty_block_branch) {
FileCacheKey key;
key.hash = io::BlockFileCache::hash("set_downloaded_empty_block_branch");
key.offset = 0;
key.meta.type = io::FileCacheType::NORMAL;
key.meta.expiration_time = 0;
key.meta.tablet_id = 0;

// mgr is intentionally nullptr: this branch returns before touching storage.
io::FileBlock block(key, 10, nullptr, io::FileBlock::State::EMPTY);
FileBlockTestAccessor::set_state(block, io::FileBlock::State::DOWNLOADING);
FileBlockTestAccessor::set_downloader_id(block, 123);
FileBlockTestAccessor::set_downloaded_size(block, 0);

Status st = FileBlockTestAccessor::call_set_downloaded(block);
ASSERT_FALSE(st.ok());
ASSERT_EQ(block.state(), io::FileBlock::State::EMPTY);
ASSERT_EQ(block.get_downloader(), 0);
}

} // namespace doris::io
Loading