From 407f489825949d59c060812555d371b1431df126 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Fri, 31 Oct 2025 10:21:59 +0800 Subject: [PATCH 1/4] [feature](cloud) Read peer be cache when balance in same cluster (#56384) --- be/src/cloud/cloud_backend_service.cpp | 24 +- be/src/cloud/cloud_internal_service.cpp | 123 +++++++++ be/src/cloud/cloud_internal_service.h | 5 + be/src/cloud/cloud_tablet_mgr.cpp | 2 +- be/src/cloud/cloud_warm_up_manager.cpp | 85 +++++++ be/src/cloud/cloud_warm_up_manager.h | 28 +++ be/src/cloud/config.cpp | 6 + be/src/cloud/config.h | 4 + be/src/http/action/file_cache_action.cpp | 5 + .../io/cache/block_file_cache_downloader.cpp | 43 +++- be/src/io/cache/block_file_cache_factory.cpp | 35 +++ be/src/io/cache/block_file_cache_factory.h | 6 + be/src/io/cache/block_file_cache_profile.cpp | 7 +- be/src/io/cache/block_file_cache_profile.h | 7 + be/src/io/cache/cached_remote_file_reader.cpp | 179 +++++++++++++- be/src/io/cache/cached_remote_file_reader.h | 13 +- be/src/io/cache/file_cache_common.h | 2 + be/src/io/cache/peer_file_cache_reader.cpp | 167 +++++++++++++ be/src/io/cache/peer_file_cache_reader.h | 82 ++++++ be/src/io/fs/file_reader.h | 2 + be/src/io/fs/s3_file_reader.cpp | 18 +- be/src/io/io_common.h | 6 + be/src/olap/rowset/beta_rowset.cpp | 3 +- be/src/olap/storage_policy.cpp | 86 +++++++ be/src/olap/storage_policy.h | 3 + be/src/util/doris_metrics.cpp | 2 + be/src/util/doris_metrics.h | 1 + be/test/olap/storage_resource_test.cpp | 109 ++++++++ .../org/apache/doris/system/HeartbeatMgr.java | 3 +- gensrc/proto/internal_service.proto | 35 +++ .../regression/action/ProfileAction.groovy | 5 +- .../balance/test_balance_warm_up.groovy | 38 ++- ...test_balance_warm_up_use_peer_cache.groovy | 223 +++++++++++++++++ ...m_up_with_compaction_use_peer_cache.groovy | 234 ++++++++++++++++++ .../read_from_peer/test_read_from_peer.groovy | 178 +++++++++++++ 35 files changed, 1739 insertions(+), 30 deletions(-) create mode 100644 be/src/io/cache/peer_file_cache_reader.cpp create mode 100644 be/src/io/cache/peer_file_cache_reader.h create mode 100644 regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy create mode 100644 regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy create mode 100644 regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 06617bcbbacafb..44cbd5609fae4f 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -177,6 +177,12 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port << ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str(); + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + // Record each tablet in manager + for (int64_t tablet_id : request.tablet_ids) { + manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port); + } + std::string host = request.host; auto dns_cache = ExecEnv::GetInstance()->dns_cache(); if (dns_cache == nullptr) { @@ -186,6 +192,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons if (!status.ok()) { LOG(WARNING) << "failed to get ip from host " << request.host << ": " << status.to_string(); + // Remove failed tablets from tracking + manager.remove_balanced_tablets(request.tablet_ids); return; } } @@ -197,6 +205,8 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons if (!brpc_stub) { st = Status::RpcError("Address {} is wrong", brpc_addr); LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr; + // Remove failed tablets from tracking + manager.remove_balanced_tablets(request.tablet_ids); return; } brpc::Controller cntl; @@ -211,10 +221,20 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons if (!cntl.Failed()) { g_file_cache_warm_up_cache_async_submitted_segment_num << brpc_response.file_cache_block_metas().size(); - _engine.file_cache_block_downloader().submit_download_task( - std::move(*brpc_response.mutable_file_cache_block_metas())); + auto& file_cache_block_metas = *brpc_response.mutable_file_cache_block_metas(); + if (!file_cache_block_metas.empty()) { + _engine.file_cache_block_downloader().submit_download_task( + std::move(file_cache_block_metas)); + LOG(INFO) << "warm_up_cache_async: successfully submitted download task for tablets=" + << oss.str(); + } else { + LOG(INFO) << "warm_up_cache_async: no file cache block meta found, addr=" << brpc_addr; + manager.remove_balanced_tablets(request.tablet_ids); + } } else { st = Status::RpcError("{} isn't connected", brpc_addr); + // Remove failed tablets from tracking + manager.remove_balanced_tablets(request.tablet_ids); LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr << ", error=" << cntl.ErrorText(); } diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index e4300d6cfbb63a..dd03d29783fd16 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -19,6 +19,9 @@ #include +#include +#include + #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "cloud/cloud_tablet_mgr.h" @@ -27,10 +30,22 @@ #include "io/cache/block_file_cache.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" +#include "util/async_io.h" #include "util/debug_points.h" namespace doris { +bvar::Adder g_file_cache_get_by_peer_num("file_cache_get_by_peer_num"); +bvar::Adder g_file_cache_get_by_peer_blocks_num("file_cache_get_by_peer_blocks_num"); +bvar::Adder g_file_cache_get_by_peer_success_num("file_cache_get_by_peer_success_num"); +bvar::Adder g_file_cache_get_by_peer_failed_num("file_cache_get_by_peer_failed_num"); +bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency( + "file_cache_get_by_peer_server_latency"); +bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency( + "file_cache_get_by_peer_read_cache_file_latency"); + CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env) : PInternalService(exec_env), _engine(engine) {} @@ -153,6 +168,114 @@ void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( << ", response=" << response->DebugString(); } +void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller + [[maybe_unused]], + const PFetchPeerDataRequest* request, + PFetchPeerDataResponse* response, + google::protobuf::Closure* done) { + // TODO(dx): use async thread pool to handle the request, not AsyncIO + brpc::ClosureGuard closure_guard(done); + g_file_cache_get_by_peer_num << 1; + if (!config::enable_file_cache) { + LOG_WARNING("try to access file cache data, but file cache not enabled"); + return; + } + int64_t begin_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + const auto type = request->type(); + const auto& path = request->path(); + response->mutable_status()->set_status_code(TStatusCode::OK); + if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) { + // Read specific range [file_offset, file_offset+file_size) across cached blocks + auto datas = io::FileCacheFactory::instance()->get_cache_data_by_path(path); + for (auto& cb : datas) { + *(response->add_datas()) = std::move(cb); + } + } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) { + // Multiple specific blocks + auto hash = io::BlockFileCache::hash(path); + auto* cache = io::FileCacheFactory::instance()->get_by_path(hash); + if (cache == nullptr) { + g_file_cache_get_by_peer_failed_num << 1; + response->mutable_status()->add_error_msgs("can't get file cache instance"); + response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); + return; + } + io::CacheContext ctx {}; + // ensure a valid stats pointer is provided to cache layer + io::ReadStatistics local_stats; + ctx.stats = &local_stats; + for (const auto& cb_req : request->cache_req()) { + size_t offset = static_cast(std::max(0, cb_req.block_offset())); + size_t size = static_cast(std::max(0, cb_req.block_size())); + auto holder = cache->get_or_set(hash, offset, size, ctx); + for (auto& fb : holder.file_blocks) { + auto state = fb->state(); + if (state != io::FileBlock::State::DOWNLOADED) { + g_file_cache_get_by_peer_failed_num << 1; + LOG(WARNING) << "read cache block failed, state=" << state; + response->mutable_status()->add_error_msgs("read cache file error"); + response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); + return; + } + g_file_cache_get_by_peer_blocks_num << 1; + doris::CacheBlockPB* out = response->add_datas(); + out->set_block_offset(static_cast(fb->offset())); + out->set_block_size(static_cast(fb->range().size())); + std::string data; + data.resize(fb->range().size()); + // Offload the file block read to a dedicated OS thread to avoid bthread IO + Status read_st = Status::OK(); + // due to file_reader.cpp:33] Check failed: bthread_self() == 0 + int64_t begin_read_file_ts = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + auto task = [&] { + // Current thread not exist ThreadContext, usually after the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext and bind a Task. + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); + Slice slice(data.data(), data.size()); + read_st = fb->read(slice, /*read_offset=*/0); + }; + AsyncIO::run_task(task, io::FileSystemType::LOCAL); + int64_t end_read_file_ts = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + g_file_cache_get_by_peer_read_cache_file_latency + << (end_read_file_ts - begin_read_file_ts); + if (read_st.ok()) { + out->set_data(std::move(data)); + } else { + g_file_cache_get_by_peer_failed_num << 1; + LOG(WARNING) << "read cache block failed: " << read_st; + response->mutable_status()->add_error_msgs("read cache file error"); + response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); + return; + } + } + } + } + DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", { + int st_us = dp->param("sleep", 1000); + LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us); + // sleep us + bthread_usleep(st_us); + }); + + int64_t end_ts = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts); + g_file_cache_get_by_peer_success_num << 1; + + VLOG_DEBUG << "fetch cache request=" << request->DebugString() + << ", response=" << response->DebugString(); +} + +#include "common/compile_check_end.h" + bvar::Adder g_file_cache_event_driven_warm_up_submitted_segment_num( "file_cache_event_driven_warm_up_submitted_segment_num"); bvar::Adder g_file_cache_event_driven_warm_up_finished_segment_num( diff --git a/be/src/cloud/cloud_internal_service.h b/be/src/cloud/cloud_internal_service.h index 59d8739cbf46d6..db4916313fe596 100644 --- a/be/src/cloud/cloud_internal_service.h +++ b/be/src/cloud/cloud_internal_service.h @@ -48,6 +48,11 @@ class CloudInternalServiceImpl final : public PInternalService { const PRecycleCacheRequest* request, PRecycleCacheResponse* response, google::protobuf::Closure* done) override; + // Get file cached data about the path in file cache + void fetch_peer_data(google::protobuf::RpcController* controller, + const PFetchPeerDataRequest* request, PFetchPeerDataResponse* response, + google::protobuf::Closure* done) override; + private: CloudStorageEngine& _engine; }; diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index bf58c6335ddd29..81aa125c4b746d 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -481,7 +481,7 @@ void CloudTabletMgr::build_all_report_tablets_info(std::map* tablet->build_tablet_report_info(&tablet_info); using namespace std::chrono; int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); - if (now - g_tablet_report_inactive_duration_ms * 1000 < tablet->last_access_time_ms) { + if (now - g_tablet_report_inactive_duration_ms < tablet->last_access_time_ms) { // the tablet is still being accessed and used in recently, so not report it return; } diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index b39b141f366c4a..91226969122f39 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -78,6 +78,7 @@ bvar::Adder g_file_cache_recycle_cache_requested_index_num( bvar::Status g_file_cache_warm_up_rowset_last_call_unix_ts( "file_cache_warm_up_rowset_last_call_unix_ts", 0); bvar::Adder file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num"); +bvar::Adder g_balance_tablet_be_mapping_size("balance_tablet_be_mapping_size"); bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency( "file_cache_warm_up_rowset_wait_for_compaction_latency"); @@ -95,6 +96,11 @@ CloudWarmUpManager::~CloudWarmUpManager() { if (_download_thread.joinable()) { _download_thread.join(); } + + for (auto& shard : _balanced_tablets_shards) { + std::lock_guard lock(shard.mtx); + shard.tablets.clear(); + } } std::unordered_map snapshot_rs_metas(BaseTablet* tablet) { @@ -728,4 +734,83 @@ void CloudWarmUpManager::recycle_cache(int64_t tablet_id, } } +// Balance warm up cache management methods implementation +void CloudWarmUpManager::record_balanced_tablet(int64_t tablet_id, const std::string& host, + int32_t brpc_port) { + auto& shard = get_shard(tablet_id); + std::lock_guard lock(shard.mtx); + JobMeta meta; + meta.be_ip = host; + meta.brpc_port = brpc_port; + shard.tablets.emplace(tablet_id, std::move(meta)); + g_balance_tablet_be_mapping_size << 1; + VLOG_DEBUG << "Recorded balanced warm up cache tablet: tablet_id=" << tablet_id + << ", host=" << host << ":" << brpc_port; +} + +std::optional> CloudWarmUpManager::get_balanced_tablet_info( + int64_t tablet_id) { + auto& shard = get_shard(tablet_id); + std::lock_guard lock(shard.mtx); + auto it = shard.tablets.find(tablet_id); + if (it == shard.tablets.end()) { + return std::nullopt; + } + return std::make_pair(it->second.be_ip, it->second.brpc_port); +} + +void CloudWarmUpManager::remove_balanced_tablet(int64_t tablet_id) { + auto& shard = get_shard(tablet_id); + std::lock_guard lock(shard.mtx); + auto it = shard.tablets.find(tablet_id); + if (it != shard.tablets.end()) { + shard.tablets.erase(it); + g_balance_tablet_be_mapping_size << -1; + VLOG_DEBUG << "Removed balanced warm up cache tablet by timer, tablet_id=" << tablet_id; + } +} + +void CloudWarmUpManager::remove_balanced_tablets(const std::vector& tablet_ids) { + // Group tablet_ids by shard to minimize lock contention + std::array, SHARD_COUNT> shard_groups; + for (int64_t tablet_id : tablet_ids) { + shard_groups[get_shard_index(tablet_id)].push_back(tablet_id); + } + + // Process each shard + for (size_t i = 0; i < SHARD_COUNT; ++i) { + if (shard_groups[i].empty()) continue; + + auto& shard = _balanced_tablets_shards[i]; + std::lock_guard lock(shard.mtx); + for (int64_t tablet_id : shard_groups[i]) { + auto it = shard.tablets.find(tablet_id); + if (it != shard.tablets.end()) { + shard.tablets.erase(it); + g_balance_tablet_be_mapping_size << -1; + VLOG_DEBUG << "Removed balanced warm up cache tablet: tablet_id=" << tablet_id; + } + } + } +} + +std::unordered_map> +CloudWarmUpManager::get_all_balanced_tablets() const { + std::unordered_map> result; + + // Lock all shards to get consistent snapshot + std::array, SHARD_COUNT> locks; + for (size_t i = 0; i < SHARD_COUNT; ++i) { + locks[i] = std::unique_lock(_balanced_tablets_shards[i].mtx); + } + + for (const auto& shard : _balanced_tablets_shards) { + for (const auto& [tablet_id, entry] : shard.tablets) { + result.emplace(tablet_id, std::make_pair(entry.be_ip, entry.brpc_port)); + } + } + return result; +} + +#include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index ad5f268acb78e9..36bdba4acae48a 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -47,6 +47,10 @@ struct JobMeta { std::vector tablet_ids; }; +// manager for +// table warm up +// cluster warm up +// balance peer addr cache class CloudWarmUpManager { public: explicit CloudWarmUpManager(CloudStorageEngine& engine); @@ -84,6 +88,14 @@ class CloudWarmUpManager { void recycle_cache(int64_t tablet_id, const std::vector& rowsets); + // Balance warm up cache management methods + void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port); + std::optional> get_balanced_tablet_info(int64_t tablet_id); + void remove_balanced_tablet(int64_t tablet_id); + void remove_balanced_tablets(const std::vector& tablet_ids); + bool is_balanced_tablet_expired(const std::chrono::system_clock::time_point& ctime) const; + std::unordered_map> get_all_balanced_tablets() const; + private: void handle_jobs(); @@ -114,6 +126,22 @@ class CloudWarmUpManager { using Cache = std::unordered_map; // job_id -> cache std::unordered_map _tablet_replica_cache; + + // Sharded lock for better performance + static constexpr size_t SHARD_COUNT = 10240; + struct Shard { + mutable std::mutex mtx; + std::unordered_map tablets; + }; + std::array _balanced_tablets_shards; + + // Helper methods for shard operations + size_t get_shard_index(int64_t tablet_id) const { + return std::hash {}(tablet_id) % SHARD_COUNT; + } + Shard& get_shard(int64_t tablet_id) { + return _balanced_tablets_shards[get_shard_index(tablet_id)]; + } }; } // namespace doris diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index ff56fcae0dfbc3..53fabb2fd3dde9 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -120,5 +120,11 @@ DEFINE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms, "120000"); DEFINE_mBool(enable_warmup_immediately_on_new_rowset, "false"); +DEFINE_mBool(enable_cache_read_from_peer, "false"); + +// Cache the expiration time of the peer address. +// This can be configured to be less than the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration. +// If the value is -1, use the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration as the expiration time. +DEFINE_mInt64(cache_read_from_peer_expired_seconds, "-1"); #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index bc38d34637064f..82add5245fd7ed 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -159,5 +159,9 @@ DECLARE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms); DECLARE_mBool(enable_warmup_immediately_on_new_rowset); +DECLARE_mBool(enable_cache_read_from_peer); + +DECLARE_mInt64(cache_read_from_peer_expired_seconds); + #include "common/compile_check_end.h" } // namespace doris::config diff --git a/be/src/http/action/file_cache_action.cpp b/be/src/http/action/file_cache_action.cpp index 740bac46edf2a7..4b9310435a2297 100644 --- a/be/src/http/action/file_cache_action.cpp +++ b/be/src/http/action/file_cache_action.cpp @@ -72,6 +72,11 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri json[RELEASED_ELEMENTS.data()] = released; *json_metrics = json.ToString(); } else if (operation == CLEAR) { + DBUG_EXECUTE_IF("FileCacheAction._handle_header.ignore_clear", { + LOG_WARNING("debug point FileCacheAction._handle_header.ignore_clear"); + st = Status::OK(); + return st; + }); const std::string& sync = req->param(SYNC.data()); const std::string& segment_path = req->param(VALUE.data()); if (segment_path.empty()) { diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index ebb968e4f16983..108979118601c4 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -20,7 +20,9 @@ #include "io/cache/block_file_cache_downloader.h" +#include #include +#include #include #include #include @@ -30,6 +32,7 @@ #include #include "cloud/cloud_tablet_mgr.h" +#include "cloud/cloud_warm_up_manager.h" #include "common/config.h" #include "common/logging.h" #include "cpp/sync_point.h" @@ -169,6 +172,14 @@ std::unordered_map snapshot_rs_metas(BaseTable return id_to_rowset_meta_map; } +static void clean_up_expired_mappings(void* arg) { + // Reclaim ownership with unique_ptr for automatic memory management + std::unique_ptr tablet_id(static_cast(arg)); + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + manager.remove_balanced_tablet(*tablet_id); + VLOG_DEBUG << "Removed expired balanced warm up cache tablet: tablet_id=" << *tablet_id; +} + void FileCacheBlockDownloader::download_file_cache_block( const DownloadTask::FileCacheBlockMetaVec& metas) { std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) { @@ -215,8 +226,27 @@ void FileCacheBlockDownloader::download_file_cache_block( << "]"; } } + // Use std::make_unique to avoid raw pointer allocation + auto tablet_id_ptr = std::make_unique(tablet_id); + unsigned long expired_ms = g_tablet_report_inactive_duration_ms; + if (doris::config::cache_read_from_peer_expired_seconds > 0 && + doris::config::cache_read_from_peer_expired_seconds <= + g_tablet_report_inactive_duration_ms / 1000) { + expired_ms = doris::config::cache_read_from_peer_expired_seconds * 1000; + } + bthread_timer_t timer_id; + // ATTN: The timer callback will reclaim ownership of the tablet_id_ptr, so we need to release it after the timer is added. + if (const int rc = + bthread_timer_add(&timer_id, butil::milliseconds_from_now(expired_ms), + clean_up_expired_mappings, tablet_id_ptr.get()); + rc == 0) { + tablet_id_ptr.release(); + } else { + LOG(WARNING) << "Fail to add timer for clean up expired mappings for tablet_id=" + << tablet_id << " rc=" << rc; + } LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id - << "status=" << st.to_string(); + << " status=" << st.to_string() << " expired_ms=" << expired_ms; }; std::string path; @@ -313,9 +343,16 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met void FileCacheBlockDownloader::download_blocks(DownloadTask& task) { switch (task.task_message.index()) { - case 0: - download_file_cache_block(std::get<0>(task.task_message)); + case 0: { + bool should_balance_task = true; + DBUG_EXECUTE_IF("FileCacheBlockDownloader.download_blocks.balance_task", + { should_balance_task = false; }); + if (should_balance_task) { + download_file_cache_block(std::get<0>(task.task_message)); + } + break; + } case 1: download_segment_file(std::get<1>(task.task_message)); break; diff --git a/be/src/io/cache/block_file_cache_factory.cpp b/be/src/io/cache/block_file_cache_factory.cpp index 8b84e6854ef83f..2d7f4eb49fdf41 100644 --- a/be/src/io/cache/block_file_cache_factory.cpp +++ b/be/src/io/cache/block_file_cache_factory.cpp @@ -41,6 +41,7 @@ #include "io/fs/local_file_system.h" #include "runtime/exec_env.h" #include "service/backend_options.h" +#include "util/slice.h" #include "vec/core/block.h" namespace doris { @@ -119,6 +120,40 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, return Status::OK(); } +std::vector FileCacheFactory::get_cache_data_by_path(const std::string& path) { + auto cache_hash = BlockFileCache::hash(path); + return get_cache_data_by_path(cache_hash); +} + +std::vector FileCacheFactory::get_cache_data_by_path( + const UInt128Wrapper& hash) { + std::vector ret; + BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(hash); + if (cache == nullptr) { + return ret; + } + auto blocks = cache->get_blocks_by_key(hash); + for (auto& [offset, fb] : blocks) { + doris::CacheBlockPB cb; + cb.set_block_offset(static_cast(offset)); + cb.set_block_size(static_cast(fb->range().size())); + // try to read data into bytes + std::string data; + data.resize(fb->range().size()); + Slice slice(data.data(), data.size()); + // read from beginning of this block + Status st = fb->read(slice, /*read_offset=*/0); + if (st.ok()) { + cb.set_data(data); + } else { + // On read failure, skip setting data but still report meta + VLOG_DEBUG << "read cache block failed: " << st; + } + ret.emplace_back(std::move(cb)); + } + return ret; +} + std::vector FileCacheFactory::get_cache_file_by_path(const UInt128Wrapper& hash) { io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash); auto blocks = cache->get_blocks_by_key(hash); diff --git a/be/src/io/cache/block_file_cache_factory.h b/be/src/io/cache/block_file_cache_factory.h index 4366afc72eb082..30d69dc15ca312 100644 --- a/be/src/io/cache/block_file_cache_factory.h +++ b/be/src/io/cache/block_file_cache_factory.h @@ -27,6 +27,7 @@ #include #include "common/status.h" +#include "gen_cpp/internal_service.pb.h" #include "io/cache/block_file_cache.h" #include "io/cache/file_cache_common.h" namespace doris { @@ -65,6 +66,11 @@ class FileCacheFactory { std::vector get_cache_file_by_path(const UInt128Wrapper& hash); int64_t get_cache_file_size_by_path(const UInt128Wrapper& hash); + // Return cached blocks data for a given key hash + std::vector get_cache_data_by_path(const UInt128Wrapper& hash); + // Convenience overload: compute hash from path and return cached blocks data + std::vector get_cache_data_by_path(const std::string& path); + BlockFileCache* get_by_path(const UInt128Wrapper& hash); BlockFileCache* get_by_path(const std::string& cache_base_path); std::vector get_query_context_holders( diff --git a/be/src/io/cache/block_file_cache_profile.cpp b/be/src/io/cache/block_file_cache_profile.cpp index 1759d37f9e4314..68d74f8525761d 100644 --- a/be/src/io/cache/block_file_cache_profile.cpp +++ b/be/src/io/cache/block_file_cache_profile.cpp @@ -30,6 +30,7 @@ std::shared_ptr FileCacheProfile::report() { std::lock_guard lock(_mtx); stats->num_io_bytes_read_from_cache += _profile->num_io_bytes_read_from_cache; stats->num_io_bytes_read_from_remote += _profile->num_io_bytes_read_from_remote; + stats->num_io_bytes_read_from_peer += _profile->num_io_bytes_read_from_peer; return stats; } @@ -44,6 +45,7 @@ void FileCacheProfile::update(FileCacheStatistics* stats) { } _profile->num_io_bytes_read_from_cache += stats->bytes_read_from_local; _profile->num_io_bytes_read_from_remote += stats->bytes_read_from_remote; + _profile->num_io_bytes_read_from_peer += stats->bytes_read_from_peer; } void FileCacheMetric::register_entity() { @@ -57,8 +59,11 @@ void FileCacheMetric::update_table_metrics() const { stats->num_io_bytes_read_from_cache); DorisMetrics::instance()->num_io_bytes_read_from_remote->set_value( stats->num_io_bytes_read_from_remote); + DorisMetrics::instance()->num_io_bytes_read_from_peer->set_value( + stats->num_io_bytes_read_from_peer); DorisMetrics::instance()->num_io_bytes_read_total->set_value( - stats->num_io_bytes_read_from_cache + stats->num_io_bytes_read_from_remote); + stats->num_io_bytes_read_from_cache + stats->num_io_bytes_read_from_remote + + stats->num_io_bytes_read_from_peer); } } // namespace doris::io diff --git a/be/src/io/cache/block_file_cache_profile.h b/be/src/io/cache/block_file_cache_profile.h index 583e1287c05ad2..3e2492da51593f 100644 --- a/be/src/io/cache/block_file_cache_profile.h +++ b/be/src/io/cache/block_file_cache_profile.h @@ -37,6 +37,7 @@ namespace io { struct AtomicStatistics { std::atomic num_io_bytes_read_from_cache = 0; std::atomic num_io_bytes_read_from_remote = 0; + std::atomic num_io_bytes_read_from_peer = 0; }; struct FileCacheProfile; @@ -75,10 +76,13 @@ struct FileCacheProfile { struct FileCacheProfileReporter { RuntimeProfile::Counter* num_local_io_total = nullptr; RuntimeProfile::Counter* num_remote_io_total = nullptr; + RuntimeProfile::Counter* num_peer_io_total = nullptr; RuntimeProfile::Counter* local_io_timer = nullptr; RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr; RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr; + RuntimeProfile::Counter* bytes_scanned_from_peer = nullptr; RuntimeProfile::Counter* remote_io_timer = nullptr; + RuntimeProfile::Counter* peer_io_timer = nullptr; RuntimeProfile::Counter* write_cache_io_timer = nullptr; RuntimeProfile::Counter* bytes_write_into_cache = nullptr; RuntimeProfile::Counter* num_skip_cache_io_total = nullptr; @@ -90,10 +94,13 @@ struct FileCacheProfileReporter { RuntimeProfile::Counter* inverted_index_num_local_io_total = nullptr; RuntimeProfile::Counter* inverted_index_num_remote_io_total = nullptr; + RuntimeProfile::Counter* inverted_index_num_peer_io_total = nullptr; RuntimeProfile::Counter* inverted_index_bytes_scanned_from_cache = nullptr; RuntimeProfile::Counter* inverted_index_bytes_scanned_from_remote = nullptr; + RuntimeProfile::Counter* inverted_index_bytes_scanned_from_peer = nullptr; RuntimeProfile::Counter* inverted_index_local_io_timer = nullptr; RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr; + RuntimeProfile::Counter* inverted_index_peer_io_timer = nullptr; RuntimeProfile::Counter* inverted_index_io_timer = nullptr; FileCacheProfileReporter(RuntimeProfile* profile) { diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index baf5a4792ea2a6..44c5b25e225f17 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -17,15 +17,24 @@ #include "io/cache/cached_remote_file_reader.h" +#include #include #include +#include #include -#include #include +#include +#include +#include +#include #include +#include +#include +#include #include +#include "cloud/cloud_warm_up_manager.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "cpp/sync_point.h" @@ -33,16 +42,26 @@ #include "io/cache/block_file_cache_factory.h" #include "io/cache/block_file_cache_profile.h" #include "io/cache/file_block.h" +#include "io/cache/peer_file_cache_reader.h" #include "io/fs/file_reader.h" #include "io/fs/local_file_system.h" #include "io/io_common.h" +#include "olap/storage_policy.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" +#include "service/backend_options.h" #include "util/bit_util.h" +#include "util/brpc_client_cache.h" // BrpcClientCache +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" namespace doris::io { bvar::Adder s3_read_counter("cached_remote_reader_s3_read"); +bvar::Adder peer_read_counter("cached_remote_reader_peer_read"); bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num"); bvar::Adder g_skip_cache_sum("cached_remote_reader_skip_cache_sum"); bvar::Adder g_skip_local_cache_io_sum_bytes( @@ -63,6 +82,8 @@ bvar::Window> g_read_cache_indirect_bytes_1min_window( bvar::Window> g_read_cache_indirect_total_bytes_1min_window( "cached_remote_reader_indirect_total_bytes_1min_window", &g_read_cache_indirect_total_bytes, 60); +bvar::Adder g_failed_get_peer_addr_counter( + "cached_remote_reader_failed_get_peer_addr_counter"); CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts) @@ -131,6 +152,126 @@ std::pair CachedRemoteFileReader::s_align_size(size_t offset, si return std::make_pair(align_left, align_size); } +namespace { +std::optional extract_tablet_id(const std::string& file_path) { + return StorageResource::parse_tablet_id_from_path(file_path); +} + +// Get peer connection info from tablet_id +std::pair get_peer_connection_info(const std::string& file_path) { + std::string host = ""; + int port = 0; + + // Try to get tablet_id from actual path and lookup tablet info + if (auto tablet_id = extract_tablet_id(file_path)) { + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) { + host = tablet_info->first; + port = tablet_info->second; + } else { + LOG_WARNING("get peer connection info not found") + .tag("tablet_id", *tablet_id) + .tag("file_path", file_path); + } + } else { + LOG_WARNING("parse tablet id from path failed") + .tag("tablet_id", "null") + .tag("file_path", file_path); + } + + DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", { + host = dp->param("host", "127.0.0.1"); + port = dp->param("port", 9060); + LOG_WARNING("debug point PeerFileCacheReader::_fetch_from_peer_cache_blocks") + .tag("host", host) + .tag("port", port); + }); + + return {host, port}; +} + +// Execute peer read with fallback to S3 +Status execute_peer_read(const std::vector& empty_blocks, size_t empty_start, + size_t& size, std::unique_ptr& buffer, + const std::string& file_path, bool is_doris_table, ReadStatistics& stats, + const IOContext* io_ctx) { + auto [host, port] = get_peer_connection_info(file_path); + VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port + << ", file_path=" << file_path; + + if (host.empty() || port == 0) { + g_failed_get_peer_addr_counter << 1; + LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is empty" + << ", host=" << host << ", port=" << port + << ", file_path=" << file_path; + return Status::InternalError("host or port is empty"); + } + SCOPED_RAW_TIMER(&stats.peer_read_timer); + peer_read_counter << 1; + PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port); + auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, Slice(buffer.get(), size), &size, + io_ctx); + if (!st.ok()) { + LOG_WARNING("PeerFileCacheReader read from peer failed") + .tag("host", host) + .tag("port", port) + .tag("error", st.msg()); + } + stats.from_peer_cache = true; + return st; +} + +// Execute S3 read +Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr& buffer, + ReadStatistics& stats, const IOContext* io_ctx, + FileReaderSPtr remote_file_reader) { + s3_read_counter << 1; + SCOPED_RAW_TIMER(&stats.remote_read_timer); + stats.from_peer_cache = false; + return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx); +} + +} // anonymous namespace + +Status CachedRemoteFileReader::_execute_remote_read(const std::vector& empty_blocks, + size_t empty_start, size_t& size, + std::unique_ptr& buffer, + ReadStatistics& stats, + const IOContext* io_ctx) { + DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", { + // Determine read type from debug point or default to S3 + std::string read_type = "s3"; + read_type = dp->param("type", "s3"); + LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type") + .tag("path", path().native()) + .tag("off", empty_start) + .tag("size", size) + .tag("type", read_type); + // Execute appropriate read strategy + if (read_type == "s3") { + return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); + } else { + return execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), + _is_doris_table, stats, io_ctx); + } + }); + + if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) { + return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); + } else { + // first try peer read, if peer failed, fallback to S3 + // peer timeout is 5 seconds + // TODO(dx): here peer and s3 reader need to get data in parallel, and take the one that is correct and returns first + auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), + _is_doris_table, stats, io_ctx); + if (!st.ok()) { + // Fallback to S3 + return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); + } + return st; + } +} + Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) { size_t already_read = 0; @@ -148,6 +289,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* *bytes_read = 0; return Status::OK(); } + ReadStatistics stats; MonotonicStopWatch read_at_sw; read_at_sw.start(); @@ -267,12 +409,11 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* empty_end = empty_blocks.back()->range().right; size_t size = empty_end - empty_start + 1; std::unique_ptr buffer(new char[size]); - { - s3_read_counter << 1; - SCOPED_RAW_TIMER(&stats.remote_read_timer); - RETURN_IF_ERROR(_remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), - &size, io_ctx)); - } + + // Determine read type and execute remote read + RETURN_IF_ERROR( + _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx)); + for (auto& block : empty_blocks) { if (block->state() == FileBlock::State::SKIP_CACHE) { continue; @@ -363,6 +504,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* << st.msg() << ", block state=" << block_state; size_t bytes_read {0}; stats.hit_cache = false; + stats.from_peer_cache = false; s3_read_counter << 1; SCOPED_RAW_TIMER(&stats.remote_read_timer); RETURN_IF_ERROR(_remote_file_reader->read_at( @@ -392,8 +534,15 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, statis->num_local_io_total++; statis->bytes_read_from_local += read_stats.bytes_read; } else { - statis->num_remote_io_total++; - statis->bytes_read_from_remote += read_stats.bytes_read; + if (read_stats.from_peer_cache) { + statis->num_peer_io_total++; + statis->bytes_read_from_peer += read_stats.bytes_read; + statis->peer_io_timer += read_stats.peer_read_timer; + } else { + statis->num_remote_io_total++; + statis->bytes_read_from_remote += read_stats.bytes_read; + statis->remote_io_timer += read_stats.remote_read_timer; + } } statis->remote_io_timer += read_stats.remote_read_timer; statis->local_io_timer += read_stats.local_read_timer; @@ -412,11 +561,17 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, statis->inverted_index_num_local_io_total++; statis->inverted_index_bytes_read_from_local += read_stats.bytes_read; } else { - statis->inverted_index_num_remote_io_total++; - statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; + if (read_stats.from_peer_cache) { + statis->inverted_index_num_peer_io_total++; + statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read; + statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; + } else { + statis->inverted_index_num_remote_io_total++; + statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; + statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; + } } statis->inverted_index_local_io_timer += read_stats.local_read_timer; - statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; } g_skip_cache_sum << read_stats.skip_cache; diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 94e8a5807ba273..939471b62ea41d 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "common/status.h" #include "io/cache/block_file_cache.h" @@ -60,15 +61,21 @@ class CachedRemoteFileReader final : public FileReader { private: void _insert_file_reader(FileBlockSPtr file_block); + + // Execute remote read (S3 or peer). + Status _execute_remote_read(const std::vector& empty_blocks, size_t empty_start, + size_t& size, std::unique_ptr& buffer, + ReadStatistics& stats, const IOContext* io_ctx); + + void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state, + bool is_inverted_index) const; + bool _is_doris_table; FileReaderSPtr _remote_file_reader; UInt128Wrapper _cache_hash; BlockFileCache* _cache; std::shared_mutex _mtx; std::map _cache_file_readers; - - void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state, - bool is_inverted_index) const; }; } // namespace doris::io diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index abbc4ff12fb735..96894ac74e5310 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -61,10 +61,12 @@ struct UInt128Wrapper { struct ReadStatistics { bool hit_cache = true; + bool from_peer_cache = false; bool skip_cache = false; int64_t bytes_read = 0; int64_t bytes_write_into_file_cache = 0; int64_t remote_read_timer = 0; + int64_t peer_read_timer = 0; int64_t local_read_timer = 0; int64_t local_write_timer = 0; int64_t read_cache_file_directly_timer = 0; diff --git a/be/src/io/cache/peer_file_cache_reader.cpp b/be/src/io/cache/peer_file_cache_reader.cpp new file mode 100644 index 00000000000000..c034cdce110324 --- /dev/null +++ b/be/src/io/cache/peer_file_cache_reader.cpp @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "io/cache/peer_file_cache_reader.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "runtime/exec_env.h" +#include "runtime/thread_context.h" +#include "util/brpc_client_cache.h" +#include "util/bvar_helper.h" +#include "util/debug_points.h" +#include "util/defer_op.h" +#include "util/doris_metrics.h" +#include "util/network_util.h" +#include "util/runtime_profile.h" + +namespace doris::io { +// read from peer + +bvar::Adder peer_cache_reader_failed_counter("peer_cache_reader", "failed_counter"); +bvar::Adder peer_cache_reader_succ_counter("peer_cache_reader", "succ_counter"); +bvar::LatencyRecorder peer_bytes_per_read("peer_cache_reader", "bytes_per_read"); // also QPS +bvar::Adder peer_cache_reader_total("peer_cache_reader", "total_num"); +bvar::Adder peer_cache_being_read("peer_cache_reader", "file_being_read"); +bvar::Adder peer_cache_reader_read_counter("peer_cache_reader", "read_at"); +bvar::LatencyRecorder peer_cache_reader_latency("peer_cache_reader", "peer_latency"); +bvar::PerSecond> peer_get_request_qps("peer_cache_reader", "peer_get_request", + &peer_cache_reader_read_counter); +bvar::Adder peer_bytes_read_total("peer_cache_reader", "bytes_read"); +bvar::PerSecond> peer_read_througthput("peer_cache_reader", + "peer_read_throughput", + &peer_bytes_read_total); + +PeerFileCacheReader::PeerFileCacheReader(const io::Path& file_path, bool is_doris_table, + std::string host, int port) + : _path(file_path), _is_doris_table(is_doris_table), _host(host), _port(port) { + peer_cache_reader_total << 1; + peer_cache_being_read << 1; +} + +PeerFileCacheReader::~PeerFileCacheReader() { + peer_cache_being_read << -1; +} + +Status PeerFileCacheReader::fetch_blocks(const std::vector& blocks, size_t off, + Slice s, size_t* bytes_read, const IOContext* ctx) { + VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off + << " bytes_read=" << *bytes_read; + *bytes_read = 0; + if (blocks.empty()) { + return Status::OK(); + } + if (!_is_doris_table) { + return Status::NotSupported("peer cache fetch only supports doris table segments"); + } + + PFetchPeerDataRequest req; + req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK); + req.set_path(_path.filename().native()); + for (const auto& blk : blocks) { + auto* cb = req.add_cache_req(); + cb->set_block_offset(static_cast(blk->range().left)); + cb->set_block_size(static_cast(blk->range().size())); + } + + std::string realhost = _host; + int port = _port; + + auto dns_cache = ExecEnv::GetInstance()->dns_cache(); + if (dns_cache == nullptr) { + LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; + } else if (!is_valid_ip(realhost)) { + Status status = dns_cache->get(_host, &realhost); + if (!status.ok()) { + peer_cache_reader_failed_counter << 1; + LOG(WARNING) << "failed to get ip from host " << _host << ": " << status.to_string(); + return Status::InternalError("failed to get ip from host {}", _host); + } + } + std::string brpc_addr = get_host_port(realhost, port); + Status st = Status::OK(); + std::shared_ptr brpc_stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache( + brpc_addr); + if (!brpc_stub) { + peer_cache_reader_failed_counter << 1; + LOG(WARNING) << "failed to get brpc stub " << brpc_addr; + st = Status::RpcError("Address {} is wrong", brpc_addr); + return st; + } + LIMIT_REMOTE_SCAN_IO(bytes_read); + int64_t begin_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + Defer defer_latency {[&]() { + int64_t end_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + peer_cache_reader_latency << (end_ts - begin_ts); + }}; + + brpc::Controller cntl; + cntl.set_timeout_ms(5000); + PFetchPeerDataResponse resp; + peer_cache_reader_read_counter << 1; + brpc_stub->fetch_peer_data(&cntl, &req, &resp, nullptr); + if (cntl.Failed()) { + return Status::RpcError(cntl.ErrorText()); + } + if (resp.has_status()) { + Status st2 = Status::create(resp.status()); + if (!st2.ok()) return st2; + } + + size_t filled = 0; + for (const auto& data : resp.datas()) { + if (data.data().empty()) { + peer_cache_reader_failed_counter << 1; + LOG(WARNING) << "peer cache read empty data" << data.block_offset(); + return Status::InternalError("peer cache read empty data"); + } + int64_t block_off = data.block_offset(); + size_t rel = block_off > static_cast(off) + ? static_cast(block_off - static_cast(off)) + : 0; + size_t can_copy = std::min(s.size - rel, static_cast(data.data().size())); + VLOG_DEBUG << "peer cache read data=" << data.block_offset() + << " size=" << data.data().size() << " off=" << rel << " can_copy=" << can_copy; + std::memcpy(s.data + rel, data.data().data(), can_copy); + filled += can_copy; + } + VLOG_DEBUG << "peer cache read filled=" << filled; + peer_bytes_read_total << filled; + *bytes_read = filled; + peer_bytes_per_read << filled; + if (filled != s.size) { + peer_cache_reader_failed_counter << 1; + return Status::InternalError("peer cache read incomplete: need={}, got={}", s.size, filled); + } + peer_cache_reader_succ_counter << 1; + return Status::OK(); +} + +} // namespace doris::io diff --git a/be/src/io/cache/peer_file_cache_reader.h b/be/src/io/cache/peer_file_cache_reader.h new file mode 100644 index 00000000000000..f028bc2cd919b6 --- /dev/null +++ b/be/src/io/cache/peer_file_cache_reader.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "common/status.h" +#include "io/cache/file_block.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_system.h" +#include "io/fs/path.h" +#include "io/fs/s3_file_system.h" +#include "util/slice.h" + +namespace doris { +class RuntimeProfile; + +namespace io { +struct IOContext; + +class PeerFileCacheReader final { +public: + /** + * Construct a peer file cache reader bound to a specific file and peer endpoint. + * + * Params: + * - file_path: Path of the target file whose cache blocks will be fetched from a peer. + * - is_doris_table: Whether the target file is a Doris table segment; only true is supported. + * - host: Peer hostname or IP address to fetch from. + * - port: Peer BRPC service port. + */ + PeerFileCacheReader(const io::Path& file_path, bool is_doris_table, std::string host, int port); + ~PeerFileCacheReader(); + /** + * Fetch data blocks from a peer and write them into the provided buffer. + * + * Behavior: + * - Supports only Doris table segment files (is_doris_table=true); otherwise returns NotSupported. + * - Builds a BRPC request to invoke peer fetch_peer_data using the given blocks. + * - Copies returned block data into the contiguous buffer Slice s, using 'off' as the base offset. + * - Succeeds only if exactly s.size bytes are written; otherwise returns an Incomplete error. + * + * Params: + * - blocks: List of file blocks to fetch (global file offsets, inclusive ranges). + * - off: Base file offset corresponding to the start of Slice s. + * - s: Destination buffer; must be large enough to hold all requested block bytes. + * - n: Output number of bytes successfully written. + * - ctx: IO context (kept for interface symmetry). + * + * Returns: + * - OK: Successfully wrote exactly s.size bytes into the buffer. + * - NotSupported: The file is not a Doris table segment. + */ + Status fetch_blocks(const std::vector& blocks, size_t off, Slice s, + size_t* bytes_read, const IOContext* ctx); + +private: + io::Path _path; + bool _is_doris_table {false}; + std::string _host = "127.0.0.1"; + int _port = 9060; +}; + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index 79efa500c0677f..e6d8527e831906 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -57,6 +57,8 @@ struct FileReaderOptions { int64_t file_size = -1; // Use modification time to determine whether the file is changed int64_t mtime = 0; + // Used to query the location of the file cache + int64_t tablet_id = -1; static const FileReaderOptions DEFAULT; }; diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 1778b50af731f6..27426484c206db 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -57,6 +57,7 @@ bvar::PerSecond> s3_read_througthput("s3_file_reader", "s3 // record successfull request, and s3_get_request_qps will record all request. bvar::PerSecond> s3_get_request_qps("s3_file_reader", "s3_get_request", &s3_file_reader_read_counter); +bvar::LatencyRecorder s3_file_reader_latency("s3_file_reader", "s3_latency"); Result S3FileReader::create(std::shared_ptr client, std::string bucket, std::string key, int64_t file_size, @@ -114,6 +115,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea size_t bytes_req = result.size; char* to = result.data; bytes_req = std::min(bytes_req, _file_size - offset); + VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req + << " req=" << result.size << " file size=" << _file_size; if (UNLIKELY(bytes_req == 0)) { *bytes_read = 0; return Status::OK(); @@ -129,16 +132,25 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff + int64_t begin_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); LIMIT_REMOTE_SCAN_IO(bytes_read); - DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", { auto sleep_time = dp->param("sleep", 3); - LOG_INFO("S3FileReader::read_at_impl.io_slow inject sleep {} s", sleep_time) + LOG_INFO("S3FileReader::read_at_impl.io_slow inject microseconds {} s", sleep_time) .tag("bucket", _bucket) .tag("key", _key); - std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); + std::this_thread::sleep_for(std::chrono::microseconds(sleep_time)); }); SCOPED_RAW_TIMER(&_s3_stats.total_get_request_time_ns); + Defer defer_latency {[&]() { + int64_t end_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + s3_file_reader_latency << (end_ts - begin_ts); + }}; + int total_sleep_time = 0; while (retry_count <= max_retries) { *bytes_read = 0; diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 82e9ae30ecada2..1dca5e62fccc99 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -45,10 +45,13 @@ struct FileReaderStats { struct FileCacheStatistics { int64_t num_local_io_total = 0; int64_t num_remote_io_total = 0; + int64_t num_peer_io_total = 0; int64_t local_io_timer = 0; int64_t bytes_read_from_local = 0; int64_t bytes_read_from_remote = 0; + int64_t bytes_read_from_peer = 0; int64_t remote_io_timer = 0; + int64_t peer_io_timer = 0; int64_t write_cache_io_timer = 0; int64_t bytes_write_into_cache = 0; int64_t num_skip_cache_io_total = 0; @@ -60,10 +63,13 @@ struct FileCacheStatistics { int64_t inverted_index_num_local_io_total = 0; int64_t inverted_index_num_remote_io_total = 0; + int64_t inverted_index_num_peer_io_total = 0; int64_t inverted_index_bytes_read_from_local = 0; int64_t inverted_index_bytes_read_from_remote = 0; + int64_t inverted_index_bytes_read_from_peer = 0; int64_t inverted_index_local_io_timer = 0; int64_t inverted_index_remote_io_timer = 0; + int64_t inverted_index_peer_io_timer = 0; int64_t inverted_index_io_timer = 0; }; diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 3f7ae765c17519..ab7b79d3b736be 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -177,7 +177,8 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se : io::FileCachePolicy::NO_CACHE, .is_doris_table = true, .cache_base_path = "", - .file_size = _rowset_meta->segment_file_size(seg_id), + .file_size = _rowset_meta->segment_file_size(static_cast(seg_id)), + .tablet_id = _rowset_meta->tablet_id(), }; auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, rowset_id(), diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp index b3c132b68e89bb..a8ab43e76202bd 100644 --- a/be/src/olap/storage_policy.cpp +++ b/be/src/olap/storage_policy.cpp @@ -21,7 +21,9 @@ #include #include +#include #include +#include #include #include "gen_cpp/cloud.pb.h" @@ -196,6 +198,90 @@ std::string StorageResource::remote_segment_path(const RowsetMeta& rowset, int64 } } +// TODO(dx) +// fix this, it is a tricky function. Pass the upper layer's tablet ID to the io layer instead of using this tricky method +// Tricky, It is used to parse tablet_id from remote segment path, and it is used in tablet manager to parse tablet_id from remote segment path. +// Static function to parse tablet_id from remote segment path +std::optional StorageResource::parse_tablet_id_from_path(const std::string& path) { + // Expected path formats: + // support both .dat and .idx file extensions + // support formate see ut. storage_resource_test:StorageResourceTest.ParseTabletIdFromPath + + if (path.empty()) { + return std::nullopt; + } + + // Find the position of "data/" in the path + std::string_view path_view = path; + std::string_view data_prefix = DATA_PREFIX; + size_t data_pos = path_view.find(data_prefix); + if (data_pos == std::string_view::npos) { + return std::nullopt; + } + + // Extract the part after "data/" + path_view = path_view.substr(data_pos + data_prefix.length() + 1); + + // Check if path ends with .dat or .idx + if (!path_view.ends_with(".dat") && !path_view.ends_with(".idx")) { + return std::nullopt; + } + + // Count slashes in the remaining path + size_t slash_count = 0; + for (char c : path_view) { + if (c == '/') { + slash_count++; + } + } + + // Split path by '/' + std::vector parts; + size_t start = 0; + size_t pos = 0; + while ((pos = path_view.find('/', start)) != std::string_view::npos) { + if (pos > start) { + parts.push_back(path_view.substr(start, pos - start)); + } + start = pos + 1; + } + if (start < path_view.length()) { + parts.push_back(path_view.substr(start)); + } + + if (parts.empty()) { + return std::nullopt; + } + + // Determine path version based on slash count and extract tablet_id + // Version 0: {tablet_id}/{rowset_id}_{seg_id}.dat (1 slash) + // Version 1: {shard}/{tablet_id}/{rowset_id}/{seg_id}.dat (3 slashes) + + if (slash_count == 1) { + // Version 0 format: parts[0] should be tablet_id + if (parts.size() >= 1) { + try { + int64_t tablet_id = std::stoll(std::string(parts[0])); + return tablet_id; + } catch (const std::exception&) { + // Not a valid number, return nullopt at last + } + } + } else if (slash_count == 3) { + // Version 1 format: parts[1] should be tablet_id (parts[0] is shard) + if (parts.size() >= 2) { + try { + int64_t tablet_id = std::stoll(std::string(parts[1])); + return tablet_id; + } catch (const std::exception&) { + // Not a valid number, return nullopt at last + } + } + } + + return std::nullopt; +} + std::string StorageResource::remote_idx_v1_path(const RowsetMeta& rowset, int64_t seg_id, int64_t index_id, std::string_view index_path_suffix) const { diff --git a/be/src/olap/storage_policy.h b/be/src/olap/storage_policy.h index e83a447aa6a60e..805833dc54865d 100644 --- a/be/src/olap/storage_policy.h +++ b/be/src/olap/storage_policy.h @@ -84,6 +84,9 @@ struct StorageResource { std::string cooldown_tablet_meta_path(int64_t tablet_id, int64_t replica_id, int64_t cooldown_term) const; + + // Static function to parse tablet_id from remote segment path + static std::optional parse_tablet_id_from_path(const std::string& path); }; // return nullptr if not found diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 7dee4e2a9fe08f..c641938b92e30a 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -224,6 +224,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM) DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote, MetricUnit::OPERATIONS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_peer, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT); @@ -373,6 +374,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_cache); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_remote); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, num_io_bytes_read_from_peer); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 94e13554eb7654..e1eb9f3d3c969b 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -243,6 +243,7 @@ class DorisMetrics { IntCounter* num_io_bytes_read_total = nullptr; IntCounter* num_io_bytes_read_from_cache = nullptr; IntCounter* num_io_bytes_read_from_remote = nullptr; + IntCounter* num_io_bytes_read_from_peer = nullptr; IntCounter* query_ctx_cnt = nullptr; IntCounter* scanner_ctx_cnt = nullptr; diff --git a/be/test/olap/storage_resource_test.cpp b/be/test/olap/storage_resource_test.cpp index 32c1b55f5f49c1..79ddd1d8a1e61b 100644 --- a/be/test/olap/storage_resource_test.cpp +++ b/be/test/olap/storage_resource_test.cpp @@ -65,4 +65,113 @@ TEST(StorageResourceTest, RemotePath) { ASSERT_DEATH(StorageResource(nullptr, storage_vault_pb.path_format()), "unknown"); } +TEST(StorageResourceTest, ParseTabletIdFromPath) { + // Test Version 0 format: data/{tablet_id}/{rowset_id}_{seg_id}.dat + // see function StorageResource::remote_segment_path + // fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, seg_id); + EXPECT_EQ( + StorageResource::parse_tablet_id_from_path( + "prefix_xxx/data/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be_5.dat"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("//data/12345/rowset_001_0.dat"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowset_abc_10.dat"), 999999); + + // Test Version 0 format with .idx files (v1 format) + // see function StorageResource::remote_idx_v1_path + // fmt::format("{}/{}/{}_{}_{}{}.idx", DATA_PREFIX, rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id, index_id, suffix); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "//data/10005/0200000000001cc2224124562e7_6_6666_suffix.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "bucket_xxx/data/12345/rowsetid_1_666_suffix.idx"), + 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowsetid_10_8888_suffix.idx"), + 999999); + + // Test Version 0 format with .idx files (v2 format) + // see function StorageResource::remote_idx_v2_path + // fmt::format("{}/{}/{}_{}.idx", DATA_PREFIX, rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3://prefix_bucket/data/10005/0200000000001cc2224124562e7_5.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/12345/rowset001_0.idx"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999999/rowsetabc_10.idx"), 999999); + + // Test Version 1 format: data/{shard}/{tablet_id}/{rowset_id}/{seg_id}.dat + // see function StorageResource::remote_segment_path + // fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "prefix_xxxx/data/611/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be/" + "5.dat"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/0/12345/rowset_001/0.dat"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("s3:///data/999/999999/rowset_abc/10.dat"), + 999999); + + // Test Version 1 format with .idx files (v1 format) + // see function StorageResource::remote_idx_v1_path + // fmt::format("{}/{}/{}/{}/{}_{}{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id, index_id, suffix); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3:///data/611/10005/0200000000001cc2224124562e7dfd4834d031b13c0210be/" + "5_6666_suffix.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "prefix_bucket/data/0/12345/rowsetid/1_666_suffix.idx"), + 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "data/999/999999/rowsetid/10_8888_suffix.idx"), + 999999); + + // Test Version 1 format with .idx files (v2 format) + // see function StorageResource::remote_idx_v2_path + // fmt::format("{}/{}/{}/{}/{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3://prefix_bucket/data/611/10005/" + "0200000000001cc2224124562e7dfd4834d031b13c0210be/5.idx"), + 10005); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/0/12345/rowset001/0.idx"), 12345); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/999/999999/rowsetabc/10.idx"), + 999999); + + // Test edge cases + // fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("prefix_bucket/data/0/rowset001_0.dat"), + 0); + // fmt::format("{}/{}/{}/{}/{}.dat", DATA_PREFIX, shard_fn(rowset.tablet_id()), rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/0/0/rowset001/0.dat"), 0); + + // Test invalid cases + EXPECT_EQ(StorageResource::parse_tablet_id_from_path(""), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("invalid_path"), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/"), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/abc/rowset_001_0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "s3://prefix_bucket/data/0/abc/rowset_001/0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001_0.txt"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001_0"), std::nullopt); + + // Test paths with different slash counts (should return nullopt) + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005/rowset_001/extra/0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("/data/10005/rowset_001/extra/0.idx"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path( + "prefix_bucket/data/10005/rowset_001/extra/0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data/10005.dat"), std::nullopt); + + // Test paths without data prefix + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("10005/rowset_001_0.dat"), std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("0/12345/rowset_001/0.dat"), std::nullopt); + + // Test paths with leading slash after data prefix + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data//10005/rowset_001_0.dat"), + std::nullopt); + EXPECT_EQ(StorageResource::parse_tablet_id_from_path("data//0/12345/rowset_001/0.dat"), + std::nullopt); +} + } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 3cf51e31613054..8b0e351f3063f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -267,7 +267,8 @@ private HeartbeatResponse pingOnce() { if (Config.isCloudMode()) { String cloudUniqueId = backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID); copiedMasterInfo.setCloudUniqueId(cloudUniqueId); - copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds); + long reportInterval = Config.rehash_tablet_after_be_dead_seconds * 1000L; + copiedMasterInfo.setTabletReportInactiveDurationMs(reportInterval); TCloudClusterInfo clusterInfo = new TCloudClusterInfo(); clusterInfo.setIsStandby(backend.isInStandbyCluster()); copiedMasterInfo.setCloudClusterInfo(clusterInfo); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index e6502d45e061e4..0b6118790b7672 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -1031,6 +1031,40 @@ message PGetTabletRowsetsResponse { optional DeleteBitmapPB delete_bitmap = 3; } +message CacheBlockReqest { + // PEER_FILE_CACHE_BLOCK + optional int64 block_offset = 1; + optional int64 block_size = 2; +} + +message PFetchPeerDataRequest { + enum Type { + PEER_FILE_RANGE = 1; + PEER_FILE_CACHE_BLOCK = 2; + } + optional Type type = 1; + // obj path, let peer calc hash, and download file cache + // PEER_FILE_RANGE and PEER_FILE_CACHE_BLOCK use + optional string path = 2; + + // PEER_FILE_RANGE + optional int64 file_offset = 3; + optional int64 file_size = 4; + // PEER_FILE_CACHE_BLOCK + repeated CacheBlockReqest cache_req = 5; +} + +message CacheBlockPB { + optional int64 block_offset = 1; + optional int64 block_size = 2; + optional bytes data = 3; +} + +message PFetchPeerDataResponse { + optional PStatus status = 1; + repeated CacheBlockPB datas = 2; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -1086,5 +1120,6 @@ service PBackendService { rpc alter_vault_sync(PAlterVaultSyncRequest) returns (PAlterVaultSyncResponse); rpc get_be_resource(PGetBeResourceRequest) returns (PGetBeResourceResponse); rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns (PGetTabletRowsetsResponse); + rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); }; diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy index 5f6c00be9439dc..7b16ea8e737fbb 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/ProfileAction.groovy @@ -65,7 +65,8 @@ class ProfileAction implements SuiteAction { } def httpCli = new HttpCliAction(context) - httpCli.endpoint(context.config.feHttpAddress) + def addr = context.getFeHttpAddress() + httpCli.endpoint("${addr.hostString}:${addr.port}") httpCli.uri("/rest/v1/query_profile") httpCli.op("get") httpCli.printResponse(false) @@ -87,7 +88,7 @@ class ProfileAction implements SuiteAction { def profileId = profileItem["Profile ID"].toString() def profileCli = new HttpCliAction(context) - profileCli.endpoint(context.config.feHttpAddress) + profileCli.endpoint("${addr.hostString}:${addr.port}") profileCli.uri("/rest/v1/query_profile/${profileId}") profileCli.op("get") profileCli.printResponse(false) diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy index 7a18f22bb31cf5..042adf0ac5ed27 100644 --- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy @@ -35,13 +35,26 @@ suite('test_balance_warm_up', 'docker') { 'report_tablet_interval_seconds=1', 'schedule_sync_tablets_interval_s=18000', 'disable_auto_compaction=true', - 'sys_log_verbose_modules=*' + 'sys_log_verbose_modules=*', + 'cache_read_from_peer_expired_seconds=100', + 'enable_cache_read_from_peer=true' ] options.setFeNum(1) options.setBeNum(1) options.cloudMode = true options.enableDebugPoints() + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + def testCase = { table -> def ms = cluster.getAllMetaservices().get(0) def msHttpPort = ms.host + ":" + ms.httpPort @@ -74,6 +87,22 @@ suite('test_balance_warm_up', 'docker') { insert into $table values (44, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); """ + // more tablets accessed. for test metrics `balance_tablet_be_mapping_size` + sql """CREATE TABLE more_tablets_warm_up_test_tbl ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 10 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into more_tablets_warm_up_test_tbl values (1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4'), (5, 'value5'), (6, 'value6'), (7, 'value7'), (8, 'value8'), (9, 'value9'), (10, 'value10'), (11, 'value11'), (12, 'value12'), (13, 'value13'), (14, 'value14'), (15, 'value15'), (16, 'value16'), (17, 'value17'), (18, 'value18'), (19, 'value19'), (20, 'value20'); + """ + // before add be def beforeGetFromFe = getTabletAndBeHostFromFe(table) def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) @@ -128,6 +157,8 @@ suite('test_balance_warm_up', 'docker') { } } + sql """select count(*) from more_tablets_warm_up_test_tbl""" + // from be1 -> be2, warm up this tablet // after add be def afterGetFromFe = getTabletAndBeHostFromFe(table) @@ -179,6 +210,11 @@ suite('test_balance_warm_up', 'docker') { "Expected cache file pattern ${hashFile} not found in BE ${newAddBe.Host}'s file_cache directory. " + "Available subdirs: ${subDirs}") } + + sleep(105 * 1000) + // test expired be tablet cache info be removed + // after cache_read_from_peer_expired_seconds = 100s + assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "balance_tablet_be_mapping_size")) } docker(options) { diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy new file mode 100644 index 00000000000000..e2209931a93156 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_use_peer_cache.groovy @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_use_peer_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=async_warmup', + 'cloud_pre_heating_time_limit_sec=30', + // disable Auto Analysis Job Executor + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*', + 'cache_read_from_peer_expired_seconds=100', + 'enable_cache_read_from_peer=true' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `v1` VARCHAR(2048) + ) + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_num"="1" + ); + """ + sql """ + insert into $table values (10, '1'), (20, '2') + """ + sql """ + insert into $table values (30, '3'), (40, '4') + """ + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("cache dir version 2 {}", beforeCacheDirVersion2) + // version 3 + def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("cache dir version 3 {}", beforeCacheDirVersion3) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader.download_blocks.balance_task") + setFeConfig('enable_cloud_multi_replica', false) + + sleep(5 * 1000) + sql """ + insert into $table values (50, '4'), (60, '6') + """ + // version 4, new rs after warm up task + def beforeCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("cache dir version 4 {}", beforeCacheDirVersion4) + def afterMerged23CacheDir = [beforeCacheDirVersion2, beforeCacheDirVersion3, beforeCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 4 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMerged23CacheDir) + + // after cloud_pre_heating_time_limit_sec = 30s + sleep(40 * 1000) + // after 30s task timeout, check tablet in new be + + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + // balance tablet + awaitUntil(500) { + def afterWarmUpTaskTimeoutResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after warm up result {}", afterWarmUpTaskTimeoutResult) + afterWarmUpTaskTimeoutResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + // from be1 -> be2, warm up this tablet + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + logger.info("after cache dir version 2 {}", afterCacheDirVersion2) + // version 3 + def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + logger.info("after cache dir version 3 {}", afterCacheDirVersion3) + sleep(5 * 1000) + // version 4 + def afterCacheDirVersion4 = getTabletFileCacheDirFromBe(msHttpPort, table, 4) + logger.info("after cache dir version 4 {}", afterCacheDirVersion4) + + def afterMergedCacheDir = [afterCacheDirVersion2, afterCacheDirVersion3, afterCacheDirVersion4] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + + logger.info("after fe tablets {}, be tablets {}, cache dir {}", afterGetFromFe, afterGetFromBe, afterMergedCacheDir) + def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + assert newAddBeCacheDir.size() != 0 + assert afterMerged23CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host]) + + def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + logger.info("Checking file_cache directory: {}", dataPath.absolutePath) + logger.info("Directory exists: {}", dataPath.exists()) + + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + logger.info("Found subdir: {}", subDir.name) + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + + // check new be not have version 2,3,4 cache file + newAddBeCacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should not found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + + // The query triggers reading the file cache from the peer + profile("test_balance_warm_up_use_peer_cache_profile") { + sql """ set enable_profile = true;""" + sql """ set profile_level = 2;""" + run { + sql """/* test_balance_warm_up_use_peer_cache_profile */ select * from $table""" + sleep(1000) + } + + check { profileString, exception -> + log.info(profileString) + // Use a regular expression to match the numeric value inside parentheses after "NumPeerIOTotal:" + def matcher = (profileString =~ /- NumPeerIOTotal:\s+(\d+)/) + def total = 0 + while (matcher.find()) { + total += matcher.group(1).toInteger() + logger.info("NumPeerIOTotal: {}", matcher.group(1)) + } + assertTrue(total > 0) + } + } + + subDirs.clear() + collectDirs(dataPath) + logger.info("after query, BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + // peer read cache, so it should have version 2,3,4 cache file + newAddBeCacheDir.each { hashFile -> + assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read")) + assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read")) + } + + docker(options) { + testCase("test_balance_warm_up_use_peer_cache_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy new file mode 100644 index 00000000000000..a232a14dea1927 --- /dev/null +++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up_with_compaction_use_peer_cache.groovy @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + + +suite('test_balance_warm_up_with_compaction_use_peer_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'rehash_tablet_after_be_dead_seconds=3600', + 'cloud_warm_up_for_rebalance_type=async_warmup', + 'cloud_pre_heating_time_limit_sec=30', + // disable Auto Analysis Job Executor + 'auto_check_statistics_in_minutes=60', + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'schedule_sync_tablets_interval_s=18000', + 'disable_auto_compaction=true', + 'sys_log_verbose_modules=*', + 'cumulative_compaction_min_deltas=5', + 'cache_read_from_peer_expired_seconds=100', + 'enable_cache_read_from_peer=true' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def mergeDirs = { base, add -> + base + add.collectEntries { host, hashFiles -> + [(host): base[host] ? (base[host] + hashFiles) : hashFiles] + } + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def testCase = { table -> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """set enable_file_cache=true""" + sql """CREATE TABLE $table ( + `id` BIGINT, + `deleted` TINYINT, + `type` String, + `author` String, + `timestamp` DateTimeV2, + `comment` String, + `dead` TINYINT, + `parent` BIGINT, + `poll` BIGINT, + `children` Array, + `url` String, + `score` INT, + `title` String, + `parts` Array, + `descendants` INT, + INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") COMMENT 'inverted index for comment' + ) + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ("replication_num" = "1"); + """ + sql """ + insert into $table values (344083, 1, 'comment', 'spez', '2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '', [], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0); + """ + sql """ + insert into $table values (44, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (46, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); + """ + sql """ + insert into $table values (344089, 1, 'comment', 'spez', '2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '', [], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0); + """ + sql """ + insert into $table values (449, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (469, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); + """ + sql """ + insert into $table values (344084, 1, 'comment', 'spez', '2008-10-26 13:49:29', 'Stay tuned...', 0, 343906, 0, [31, 454446], '', 0, '', [], 0), (33, 0, 'comment', 'spez', '2006-10-10 23:50:40', 'winnar winnar chicken dinnar!', 0, 31, 0, [34, 454450], '', 0, '', [], 0); + """ + sql """ + insert into $table values (849, 1, 'comment', 'spez', '2006-10-11 23:00:48', 'Welcome back, Randall', 0, 43, 0, [454465], '', 0, '', [], 0), (869, 0, 'story', 'goldfish', '2006-10-11 23:39:28', '', 0, 0, 0, [454470], 'http://www.rentometer.com/', 0, ' VCs Prefer to Fund Nearby Firms - New York Times', [], 0); + """ + // trigger compaction to generate some cache files + trigger_and_wait_compaction(table, "cumulative") + sleep(5 * 1000) + + def beforeCacheDirVersion7 = getTabletFileCacheDirFromBe(msHttpPort, table, 7) + + // before add be + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + + def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("before warm up result {}", beforeWarmUpResult) + + // disable cloud balance + setFeConfig('enable_cloud_multi_replica', true) + cluster.addBackend(1, "compute_cluster") + GetDebugPoint().enableDebugPointForAllBEs("FileCacheBlockDownloader.download_blocks.balance_task") + setFeConfig('enable_cloud_multi_replica', false) + + + def afterMergedVersion7CacheDir = [beforeCacheDirVersion7] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + logger.info("after version 7 fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, afterMergedVersion7CacheDir) + + // after cloud_pre_heating_time_limit_sec = 30s + sleep(40 * 1000) + // after 30s task timeout, check tablet in new be + + def oldBe = sql_return_maparray('show backends').get(0) + def newAddBe = sql_return_maparray('show backends').get(1) + // balance tablet + awaitUntil(500) { + def afterWarmUpTaskTimeoutResult = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM $table""" + logger.info("after warm up result {}", afterWarmUpTaskTimeoutResult) + afterWarmUpTaskTimeoutResult.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 1 + } + } + + // from be1 -> be2, warm up this tablet + // after add be + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 7 + def afterCacheDirVersion7 = getTabletFileCacheDirFromBe(msHttpPort, table, 7) + logger.info("after cache dir version 7 {}", afterCacheDirVersion7) + + def afterMergedCacheDir = [afterCacheDirVersion7] + .inject([:]) { acc, m -> mergeDirs(acc, m) } + + logger.info("after fe tablets {}, be tablets {}, cache dir {}", afterGetFromFe, afterGetFromBe, afterMergedCacheDir) + // calc file cache hash on new added BE, but these cache files should not exist on new BE yet + def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host) + logger.info("new add be cache dir {}", newAddBeCacheDir) + assert newAddBeCacheDir.size() != 0 + assert afterMergedVersion7CacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host]) + + def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + logger.info("Checking file_cache directory: {}", dataPath.absolutePath) + logger.info("Directory exists: {}", dataPath.exists()) + + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + logger.info("Found subdir: {}", subDir.name) + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + + // check new be not have version 7 cache file + newAddBeCacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should not found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + + // The query triggers reading the file cache from the peer + profile("test_balance_warm_up_with_compaction_use_peer_cache_profile") { + sql """ set enable_profile = true;""" + sql """ set profile_level = 2;""" + run { + sql """/* test_balance_warm_up_with_compaction_use_peer_cache_profile */ SELECT count() FROM $table WHERE comment MATCH_ALL 'Welcome'""" + sleep(1000) + } + + check { profileString, exception -> + log.info(profileString) + // Use a regular expression to match the numeric value inside parentheses after "NumPeerIOTotal:" + def matcher = (profileString =~ /- InvertedIndexNumPeerIOTotal:\s+(\d+)/) + def total = 0 + while (matcher.find()) { + total += matcher.group(1).toInteger() + logger.info("InvertedIndexNumPeerIOTotal: {}", matcher.group(1)) + } + assertTrue(total > 0) + } + } + subDirs.clear() + collectDirs(dataPath) + logger.info("after query, BE {} file_cache subdirs: {}", newAddBe.Host, subDirs) + // peer read cache, so it should have version 7 cache file + newAddBeCacheDir.each { hashFile -> + assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Expected cache file pattern ${hashFile} should found in BE ${newAddBe.Host}'s file_cache directory. " + + "Available subdirs: ${subDirs}") + } + assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_peer_read")) + assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, "cached_remote_reader_s3_read")) + } + + docker(options) { + testCase("test_balance_warm_up_with_compaction_use_peer_cache_tbl") + } +} diff --git a/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy b/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy new file mode 100644 index 00000000000000..7ccf118b65f9d6 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/read_from_peer/test_read_from_peer.groovy @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper +import org.awaitility.Awaitility; +import static java.util.concurrent.TimeUnit.SECONDS; +import org.apache.doris.regression.util.NodeType + +suite('test_read_from_peer', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'workload_group_check_interval_ms=1' + ] + options.beConfigs += [ + 'file_cache_each_block_size=131072', + // 'sys_log_verbose_modules=*', + 'enable_cache_read_from_peer=true' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + options.enableDebugPoints() + + def tableName = "test_read_from_table" + + def clusterBe = { clusterName -> + def bes = sql_return_maparray "show backends" + def clusterBes = bes.findAll { be -> be.Tag.contains(clusterName) } + logger.info("cluster {}, bes {}", clusterName, clusterBes) + clusterBes[0] + } + + def testCase = { String clusterName, String runType -> + def startTime = System.currentTimeMillis() + GetDebugPoint().enableDebugPointForAllBEs("CachedRemoteFileReader.read_at_impl.change_type", [type: runType]) + + try { + sql """ + use @$clusterName + """ + + def be = clusterBe(clusterName) + def haveCacheBe = clusterBe("compute_cluster") + + switch (runType) { + case "peer": + GetDebugPoint().enableDebugPoint(be.Host, be.HttpPort as int, NodeType.BE, "PeerFileCacheReader::_fetch_from_peer_cache_blocks", + [host: haveCacheBe.Host, port: haveCacheBe.BrpcPort]) + break + case "s3": + break + default: + throw new IllegalArgumentException("Invalid type: $runType. Expected: peer, s3") + } + + // Execute the query and measure time + def queryStartTime = System.currentTimeMillis() + def ret = sql """ + select count(*) from $tableName + """ + logger.info("select ret={}", ret) + def queryTime = System.currentTimeMillis() - queryStartTime + logger.info("Test completed - Type:{}, Cluster: {}, Query execution time: {}ms", runType, clusterName, queryTime) + } catch (Exception e) { + def totalTime = System.currentTimeMillis() - startTime + logger.info("Test failed after {}ms - Type: {}, Cluster: {} Error: {}", totalTime, runType, clusterName, e.message) + throw e + } + } + + docker(options) { + // 添加一个新的cluster, 只从s3上读 + cluster.addBackend(1, "readS3cluster") + + // 添加一个新的cluster, 只从peer上读 + cluster.addBackend(1, "readPeercluster") + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + ss_sold_date_sk bigint, + ss_sold_time_sk bigint, + ss_item_sk bigint, + ss_customer_sk bigint, + ss_cdemo_sk bigint, + ss_hdemo_sk bigint, + ss_addr_sk bigint, + ss_store_sk bigint, + ss_promo_sk bigint, + ss_ticket_number bigint, + ss_quantity integer, + ss_wholesale_cost decimal(7,2), + ss_list_price decimal(7,2), + ss_sales_price decimal(7,2), + ss_ext_discount_amt decimal(7,2), + ss_ext_sales_price decimal(7,2), + ss_ext_wholesale_cost decimal(7,2), + ss_ext_list_price decimal(7,2), + ss_ext_tax decimal(7,2), + ss_coupon_amt decimal(7,2), + ss_net_paid decimal(7,2), + ss_net_paid_inc_tax decimal(7,2), + ss_net_profit decimal(7,2) + ) + DUPLICATE KEY(ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk) + DISTRIBUTED BY HASH(ss_customer_sk) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + + sql """ + use @compute_cluster + """ + + // in compute_cluster be-1, cache all data in file cache + def txnId = -1; + // version 2 + streamLoad { + table "${tableName}" + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + + file """${getS3Url()}/regression/tpcds/sf1/store_sales.dat.gz""" + // file """store_sales.dat.gz""" + + time 10000 // limit inflight 10s + setFeAddr cluster.getAllFrontends().get(0).host, cluster.getAllFrontends().get(0).httpPort + + check { res, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${res}".toString()) + def json = parseJson(res) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + + def ret = sql """ + select count(*) from $tableName + """ + logger.info("ret after load, ret {}", ret) + + testCase("compute_cluster", "s3") + testCase("readS3cluster", "s3") + testCase("readPeercluster", "peer") + } +} From fee3d88af4a85aaf5d393e8c46d881efdea43305 Mon Sep 17 00:00:00 2001 From: deardeng Date: Tue, 4 Nov 2025 17:26:56 +0800 Subject: [PATCH 2/4] fix compile --- be/src/io/cache/peer_file_cache_reader.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/io/cache/peer_file_cache_reader.cpp b/be/src/io/cache/peer_file_cache_reader.cpp index c034cdce110324..5fa376728b7d8e 100644 --- a/be/src/io/cache/peer_file_cache_reader.cpp +++ b/be/src/io/cache/peer_file_cache_reader.cpp @@ -29,6 +29,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "runtime/exec_env.h" #include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" #include "util/brpc_client_cache.h" #include "util/bvar_helper.h" #include "util/debug_points.h" From f8d28a37482b31f179938019e7adb10f03cb0f5c Mon Sep 17 00:00:00 2001 From: deardeng Date: Tue, 4 Nov 2025 18:17:45 +0800 Subject: [PATCH 3/4] fix --- be/src/cloud/cloud_internal_service.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index dd03d29783fd16..984608da7004b4 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -274,8 +274,6 @@ void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* << ", response=" << response->DebugString(); } -#include "common/compile_check_end.h" - bvar::Adder g_file_cache_event_driven_warm_up_submitted_segment_num( "file_cache_event_driven_warm_up_submitted_segment_num"); bvar::Adder g_file_cache_event_driven_warm_up_finished_segment_num( From d07de5bdab9a39baad5b26ec472453b49c56b05a Mon Sep 17 00:00:00 2001 From: deardeng Date: Thu, 6 Nov 2025 11:10:51 +0800 Subject: [PATCH 4/4] fix compile --- be/src/cloud/cloud_warm_up_manager.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 91226969122f39..ba582b40eaff4e 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -812,5 +812,4 @@ CloudWarmUpManager::get_all_balanced_tablets() const { return result; } -#include "common/compile_check_end.h" } // namespace doris