diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 299888ab2e1c3c..501c53a6160bcc 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -113,7 +113,12 @@ void FileBlock::reset_downloader_impl(std::lock_guard& block_lock) { Status FileBlock::set_downloaded(std::lock_guard& /* block_lock */) { DCHECK(_download_state != State::DOWNLOADED); - DCHECK_NE(_downloaded_size, 0); + if (_downloaded_size == 0) { + _download_state = State::EMPTY; + _downloader_id = 0; + return Status::InternalError("Try to set empty block {} as downloaded", + _block_range.to_string()); + } Status status = _mgr->_storage->finalize(_key, this->_block_range.size()); if (status.ok()) [[likely]] { _download_state = State::DOWNLOADED; @@ -147,7 +152,15 @@ Status FileBlock::append(Slice data) { } Status FileBlock::finalize() { - if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) { + if (_downloaded_size == 0) { + std::lock_guard block_lock(_mutex); + _download_state = State::EMPTY; + _downloader_id = 0; + _cv.notify_all(); + return Status::InternalError("Try to finalize an empty file block {}", + _block_range.to_string()); + } + if (_downloaded_size != _block_range.size()) { SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); size_t old_size = _block_range.size(); _block_range.right = _block_range.left + _downloaded_size - 1; diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h index bfe9a69ac9f5e7..fba47a0aff82a7 100644 --- a/be/src/io/cache/file_block.h +++ b/be/src/io/cache/file_block.h @@ -44,6 +44,7 @@ class FileBlock { friend class BlockFileCache; friend class CachedRemoteFileReader; friend struct FileBlockCell; + friend class FileBlockTestAccessor; public: enum class State { diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 8a55b26199d857..6bb08c98bf13dc 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -23,6 +23,22 @@ namespace doris::io { +class FileBlockTestAccessor { +public: + static void set_state(FileBlock& block, FileBlock::State state) { + block._download_state = state; + } + static void set_downloader_id(FileBlock& block, uint64_t id) { block._downloader_id = id; } + static void set_downloaded_size(FileBlock& block, size_t size) { + block._downloaded_size = size; + } + + static Status call_set_downloaded(FileBlock& block) { + std::lock_guard lock(block._mutex); + return block.set_downloaded(lock); + } +}; + fs::path caches_dir = fs::current_path() / "lru_cache_test"; std::string cache_base_path = caches_dir / "cache1" / ""; std::string tmp_file = caches_dir / "tmp_file"; @@ -7671,4 +7687,119 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) { FileCacheFactory::instance()->_capacity = 0; } +TEST_F(BlockFileCacheTest, finalize_empty_block) { + std::string my_cache_path = caches_dir / "empty_block_test" / ""; + if (fs::exists(my_cache_path)) { + fs::remove_all(my_cache_path); + } + io::FileCacheSettings settings; + settings.capacity = 100; + settings.max_file_block_size = 100; + io::BlockFileCache mgr(my_cache_path, settings); + ASSERT_TRUE(mgr.initialize().ok()); + + for (int i = 0; i < 100; i++) { + if (mgr.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + auto key = io::BlockFileCache::hash("empty_block_test"); + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.cache_type = io::FileCacheType::NORMAL; + + { + auto holder = mgr.get_or_set(key, 0, 10, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + auto block = blocks[0]; + ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY); + + ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id()); + ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADING); + + // Call finalize without calling append() + Status st = block->finalize(); + ASSERT_FALSE(st.ok()); + ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY); + ASSERT_EQ(block->get_downloader(), 0); + } + if (fs::exists(my_cache_path)) { + fs::remove_all(my_cache_path); + } +} + +TEST_F(BlockFileCacheTest, finalize_partial_block) { + std::string my_cache_path = caches_dir / "partial_block_test" / ""; + if (fs::exists(my_cache_path)) { + fs::remove_all(my_cache_path); + } + io::FileCacheSettings settings; + settings.capacity = 100; + settings.max_file_block_size = 100; + io::BlockFileCache mgr(my_cache_path, settings); + ASSERT_TRUE(mgr.initialize().ok()); + + for (int i = 0; i < 100; i++) { + if (mgr.get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + auto key = io::BlockFileCache::hash("partial_block_test"); + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.cache_type = io::FileCacheType::NORMAL; + + { + auto holder = mgr.get_or_set(key, 0, 10, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + auto block = blocks[0]; + ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id()); + + std::string data(5, '0'); + ASSERT_TRUE(block->append(Slice(data.data(), data.size())).ok()); + + // Finalize a block that only has 5 bytes out of 10 + Status st = block->finalize(); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADED); + ASSERT_EQ(block->range().size(), 5); + ASSERT_EQ(block->range().right, 4); + } + + // Verify it was shrunk in the cache + ASSERT_EQ(mgr.get_used_cache_size(io::FileCacheType::NORMAL), 5); + + if (fs::exists(my_cache_path)) { + fs::remove_all(my_cache_path); + } +} + +TEST_F(BlockFileCacheTest, set_downloaded_empty_block_branch) { + FileCacheKey key; + key.hash = io::BlockFileCache::hash("set_downloaded_empty_block_branch"); + key.offset = 0; + key.meta.type = io::FileCacheType::NORMAL; + key.meta.expiration_time = 0; + key.meta.tablet_id = 0; + + // mgr is intentionally nullptr: this branch returns before touching storage. + io::FileBlock block(key, 10, nullptr, io::FileBlock::State::EMPTY); + FileBlockTestAccessor::set_state(block, io::FileBlock::State::DOWNLOADING); + FileBlockTestAccessor::set_downloader_id(block, 123); + FileBlockTestAccessor::set_downloaded_size(block, 0); + + Status st = FileBlockTestAccessor::call_set_downloaded(block); + ASSERT_FALSE(st.ok()); + ASSERT_EQ(block.state(), io::FileBlock::State::EMPTY); + ASSERT_EQ(block.get_downloader(), 0); +} + } // namespace doris::io