From a277f6a3029386dfb0ba961b5d2ecb309158bb46 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 29 Apr 2026 16:29:38 +0800 Subject: [PATCH] refactor(util): make retry policy explicit and deterministic --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/test/retry_util_test.cc | 397 ++++++++++++++++++++++--- src/iceberg/util/retry_util.cc | 170 +++++++++++ src/iceberg/util/retry_util.h | 154 +++++----- src/iceberg/util/retry_util_internal.h | 60 ++++ 6 files changed, 661 insertions(+), 122 deletions(-) create mode 100644 src/iceberg/util/retry_util.cc create mode 100644 src/iceberg/util/retry_util_internal.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 603c35343..73d42817e 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -114,6 +114,7 @@ set(ICEBERG_SOURCES util/gzip_internal.cc util/murmurhash3_internal.cc util/property_util.cc + util/retry_util.cc util/snapshot_util.cc util/string_util.cc util/struct_like_set.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 5e68def98..4ec9101db 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -136,6 +136,7 @@ iceberg_sources = files( 'util/gzip_internal.cc', 'util/murmurhash3_internal.cc', 'util/property_util.cc', + 'util/retry_util.cc', 'util/snapshot_util.cc', 'util/string_util.cc', 'util/struct_like_set.cc', diff --git a/src/iceberg/test/retry_util_test.cc b/src/iceberg/test/retry_util_test.cc index 05c7cb0e1..6d1a7c225 100644 --- a/src/iceberg/test/retry_util_test.cc +++ b/src/iceberg/test/retry_util_test.cc @@ -19,15 +19,73 @@ #include "iceberg/util/retry_util.h" -#include -#include +#include +#include +#include #include #include "iceberg/result.h" #include "iceberg/test/matchers.h" +#include "iceberg/util/retry_util_internal.h" namespace iceberg { +namespace { + +struct ResultReturningTask { + Result operator()() const { return 1; } +}; + +struct NonResultReturningTask { + int operator()() const { return 1; } +}; + +static_assert(detail::RetryTask); +static_assert(!detail::RetryTask); +static_assert(requires(RetryRunner runner, ResultReturningTask task) { + { runner.Run(task) } -> std::same_as>; +}); + +class FakeRetryEnvironment { + public: + using Duration = RetryTestHooks::Duration; + using TimePoint = RetryTestHooks::TimePoint; + + FakeRetryEnvironment() { + hooks_.now = [this]() { return now_; }; + hooks_.sleep_for = [this](Duration duration) { + sleep_durations_.push_back(duration); + now_ += duration; + }; + hooks_.jitter = [this](int32_t base_delay_ms) { + observed_base_delays_ms_.push_back(base_delay_ms); + return base_delay_ms + jitter_offset_ms_; + }; + } + + void Advance(Duration duration) { now_ += duration; } + + void SetJitterOffsetMs(int32_t jitter_offset_ms) { + jitter_offset_ms_ = jitter_offset_ms; + } + + const RetryTestHooks& hooks() const { return hooks_; } + + const std::vector& sleep_durations() const { return sleep_durations_; } + + const std::vector& observed_base_delays_ms() const { + return observed_base_delays_ms_; + } + + private: + RetryTestHooks hooks_; + TimePoint now_{}; + int32_t jitter_offset_ms_ = 0; + std::vector sleep_durations_; + std::vector observed_base_delays_ms_; +}; + +} // namespace TEST(RetryRunnerTest, SuccessOnFirstAttempt) { int call_count = 0; @@ -37,6 +95,7 @@ TEST(RetryRunnerTest, SuccessOnFirstAttempt) { .min_wait_ms = 1, .max_wait_ms = 10, .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) .Run( [&]() -> Result { ++call_count; @@ -58,6 +117,7 @@ TEST(RetryRunnerTest, RetryOnceThenSucceed) { .min_wait_ms = 1, .max_wait_ms = 10, .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) .Run( [&]() -> Result { ++call_count; @@ -82,6 +142,7 @@ TEST(RetryRunnerTest, MaxAttemptsExhausted) { .min_wait_ms = 1, .max_wait_ms = 10, .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) .Run( [&]() -> Result { ++call_count; @@ -140,6 +201,32 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { EXPECT_EQ(attempts, 3); } +TEST(RetryRunnerTest, OnlyRetryOnTakesPrecedenceOverStopRetryOn) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .StopRetryOn({ErrorKind::kCommitFailed}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("transient"); + } + return 100; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 100); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(attempts, 2); +} + TEST(RetryRunnerTest, StopRetryOnMatchingError) { int call_count = 0; int32_t attempts = 0; @@ -161,14 +248,40 @@ TEST(RetryRunnerTest, StopRetryOnMatchingError) { EXPECT_EQ(attempts, 1); } -TEST(RetryRunnerTest, ZeroRetries) { +TEST(RetryRunnerTest, StopRetryOnNonMatchingErrorAllowsRetry) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 0, + auto result = RetryRunner(RetryConfig{.num_retries = 2, .min_wait_ms = 1, .max_wait_ms = 10, .total_timeout_ms = 5000}) + .StopRetryOn({ErrorKind::kCommitStateUnknown}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("retryable"); + } + return 88; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 88); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(attempts, 2); +} + +TEST(RetryRunnerTest, ZeroRetriesAllowsUnsetPolicyAndSkipsBackoffValidation) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 0, + .min_wait_ms = 0, + .max_wait_ms = 0, + .total_timeout_ms = 5000, + .scale_factor = 0.5}) .Run( [&]() -> Result { ++call_count; @@ -181,7 +294,159 @@ TEST(RetryRunnerTest, ZeroRetries) { EXPECT_EQ(attempts, 1); } +TEST(RetryRunnerTest, NegativeRetriesFailsBeforeTaskRuns) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = -1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return 1; + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("num_retries must be non-negative")); + EXPECT_EQ(call_count, 0); + EXPECT_EQ(attempts, 0); +} + +TEST(RetryRunnerTest, InvalidBackoffConfigFailsBeforeTaskRuns) { + struct InvalidConfigCase { + RetryConfig config; + const char* expected_message; + }; + + const std::vector test_cases = { + {RetryConfig{.num_retries = std::numeric_limits::max(), + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}, + "num_retries is too large"}, + {RetryConfig{.num_retries = 1, + .min_wait_ms = 0, + .max_wait_ms = 10, + .total_timeout_ms = 5000}, + "min_wait_ms must be positive"}, + {RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 0, + .total_timeout_ms = 5000}, + "max_wait_ms must be positive"}, + {RetryConfig{.num_retries = 1, + .min_wait_ms = 20, + .max_wait_ms = 10, + .total_timeout_ms = 5000}, + "max_wait_ms must be greater than or equal to min_wait_ms"}, + {RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000, + .scale_factor = 0.5}, + "scale_factor must be finite and at least 1.0"}, + {RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000, + .scale_factor = std::numeric_limits::infinity()}, + "scale_factor must be finite and at least 1.0"}, + }; + + for (const auto& test_case : test_cases) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(test_case.config) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + return 1; + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)) + << test_case.expected_message; + EXPECT_THAT(result, HasErrorMessage(test_case.expected_message)); + EXPECT_EQ(call_count, 0); + EXPECT_EQ(attempts, 0); + } +} + +TEST(RetryRunnerTest, UnsetRetryPolicyFailsBeforeTaskRuns) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("fail"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Retry policy must be explicitly configured")); + EXPECT_EQ(call_count, 0); + EXPECT_EQ(attempts, 0); +} + +TEST(RetryRunnerTest, EmptyOnlyRetryOnPolicyFailsBeforeTaskRuns) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn(std::initializer_list{}) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("fail"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, + HasErrorMessage("Retry policy must include at least one error kind")); + EXPECT_EQ(call_count, 0); + EXPECT_EQ(attempts, 0); +} + +TEST(RetryRunnerTest, EmptyStopRetryOnPolicyFailsBeforeTaskRuns) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .StopRetryOn({}) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("fail"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, + HasErrorMessage("Retry policy must include at least one error kind")); + EXPECT_EQ(call_count, 0); + EXPECT_EQ(attempts, 0); +} + TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) { + FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks scoped_hooks(fake_retry.hooks()); int call_count = 0; int32_t attempts = 0; @@ -189,14 +454,12 @@ TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) { .min_wait_ms = 20, .max_wait_ms = 20, .total_timeout_ms = 15}) + .OnlyRetryOn(ErrorKind::kCommitFailed) .Run( [&]() -> Result { ++call_count; - // The first failure consumes most of the 15 ms budget, so the - // next 20 ms backoff should prevent another attempt from - // starting. if (call_count == 1) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + fake_retry.Advance(FakeRetryEnvironment::Duration(10)); } return CommitFailed("retry budget exhausted"); }, @@ -205,6 +468,97 @@ TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) { EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); EXPECT_EQ(call_count, 1); EXPECT_EQ(attempts, 1); + EXPECT_TRUE(fake_retry.sleep_durations().empty()); + EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector({20})); +} + +TEST(RetryRunnerTest, TotalTimeoutStopsWhenDelayEqualsRemainingBudget) { + FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks scoped_hooks(fake_retry.hooks()); + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 20}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + fake_retry.Advance(FakeRetryEnvironment::Duration(10)); + return CommitFailed("retry budget exhausted"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); + EXPECT_TRUE(fake_retry.sleep_durations().empty()); + EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector({10})); +} + +TEST(RetryRunnerTest, NonPositiveTotalTimeoutDisablesDeadline) { + FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks scoped_hooks(fake_retry.hooks()); + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 0}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + fake_retry.Advance(FakeRetryEnvironment::Duration(100)); + if (call_count <= 2) { + return CommitFailed("transient"); + } + return 123; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 123); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); + EXPECT_EQ(fake_retry.sleep_durations(), std::vector( + {FakeRetryEnvironment::Duration(10), + FakeRetryEnvironment::Duration(10)})); + EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector({10, 10})); +} + +TEST(RetryRunnerTest, RetryDelayDoesNotExceedMaxWaitAfterJitter) { + FakeRetryEnvironment fake_retry; + fake_retry.SetJitterOffsetMs(100); + ScopedRetryTestHooks scoped_hooks(fake_retry.hooks()); + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 0}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("transient"); + } + return 321; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 321); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(attempts, 2); + EXPECT_EQ(fake_retry.sleep_durations(), std::vector( + {FakeRetryEnvironment::Duration(10)})); + EXPECT_EQ(fake_retry.observed_base_delays_ms(), std::vector({10})); } TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) { @@ -274,31 +628,4 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { EXPECT_EQ(attempts, 3); } -TEST(RetryRunnerTest, DefaultRetryAllErrors) { - int call_count = 0; - int32_t attempts = 0; - - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .Run( - [&]() -> Result { - ++call_count; - if (call_count == 1) { - return IOError("disk full"); - } - if (call_count == 2) { - return ValidationFailed("bad schema"); - } - return 55; - }, - &attempts); - - EXPECT_THAT(result, IsOk()); - EXPECT_EQ(*result, 55); - EXPECT_EQ(call_count, 3); - EXPECT_EQ(attempts, 3); -} - } // namespace iceberg diff --git a/src/iceberg/util/retry_util.cc b/src/iceberg/util/retry_util.cc new file mode 100644 index 000000000..3986c6fba --- /dev/null +++ b/src/iceberg/util/retry_util.cc @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/retry_util.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/util/retry_util_internal.h" + +namespace iceberg { +namespace { + +RetryTestHooks::TimePoint RetryNow() { + if (active_retry_test_hooks != nullptr && active_retry_test_hooks->now) { + return active_retry_test_hooks->now(); + } + return RetryTestHooks::Clock::now(); +} + +void RetrySleepFor(RetryTestHooks::Duration duration) { + if (active_retry_test_hooks != nullptr && active_retry_test_hooks->sleep_for) { + active_retry_test_hooks->sleep_for(duration); + return; + } + std::this_thread::sleep_for(duration); +} + +int32_t ApplyRetryJitter(int32_t base_delay_ms) { + if (active_retry_test_hooks != nullptr && active_retry_test_hooks->jitter) { + return active_retry_test_hooks->jitter(base_delay_ms); + } + + static thread_local std::mt19937 gen(std::random_device{}()); + const int32_t jitter_range = std::max(1, base_delay_ms / 10); + std::uniform_int_distribution<> dis(0, jitter_range - 1); + const int64_t jittered_delay_ms = static_cast(base_delay_ms) + dis(gen); + return static_cast( + std::min(jittered_delay_ms, std::numeric_limits::max())); +} + +} // namespace + +Status RetryRunner::ValidateConfig() const { + if (config_.num_retries < 0) { + return InvalidArgument("num_retries must be non-negative, got {}", + config_.num_retries); + } + if (config_.num_retries == 0) { + return {}; + } + if (config_.num_retries == std::numeric_limits::max()) { + return InvalidArgument("num_retries is too large, got {}", config_.num_retries); + } + if (config_.min_wait_ms <= 0) { + return InvalidArgument("min_wait_ms must be positive, got {}", config_.min_wait_ms); + } + if (config_.max_wait_ms <= 0) { + return InvalidArgument("max_wait_ms must be positive, got {}", config_.max_wait_ms); + } + if (config_.max_wait_ms < config_.min_wait_ms) { + return InvalidArgument("max_wait_ms must be greater than or equal to min_wait_ms"); + } + if (!std::isfinite(config_.scale_factor) || config_.scale_factor < 1.0) { + return InvalidArgument("scale_factor must be finite and at least 1.0, got {}", + config_.scale_factor); + } + if (retry_policy_mode_ == RetryPolicyMode::kUnset) { + return InvalidArgument( + "Retry policy must be explicitly configured with OnlyRetryOn(...) or " + "StopRetryOn(...) when num_retries > 0"); + } + if (retry_error_kinds_.empty()) { + return InvalidArgument("Retry policy must include at least one error kind"); + } + + return {}; +} + +std::optional RetryRunner::ComputeDeadline() const { + if (config_.total_timeout_ms <= 0) { + return std::nullopt; + } + return RetryNow() + Duration(config_.total_timeout_ms); +} + +bool RetryRunner::HasTimedOut(const std::optional& deadline) const { + return deadline.has_value() && RetryNow() >= *deadline; +} + +bool RetryRunner::ShouldRetry(ErrorKind kind) const { + const bool policy_contains_kind = std::ranges::contains(retry_error_kinds_, kind); + switch (retry_policy_mode_) { + case RetryPolicyMode::kOnlyRetryOn: + return policy_contains_kind; + case RetryPolicyMode::kStopRetryOn: + return !policy_contains_kind; + case RetryPolicyMode::kUnset: + return false; + } + return false; +} + +bool RetryRunner::CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts, + const std::optional& deadline) const { + return attempt < max_attempts && !HasTimedOut(deadline) && ShouldRetry(kind); +} + +std::optional RetryRunner::RetryDelayWithinBudget( + int32_t attempt, const std::optional& deadline) const { + const auto delay = Duration(CalculateDelay(attempt)); + if (!deadline.has_value()) { + return delay; + } + + const auto now = RetryNow(); + if (now >= *deadline) { + return std::nullopt; + } + + const auto remaining = std::chrono::duration_cast(*deadline - now); + if (remaining <= Duration::zero() || delay >= remaining) { + return std::nullopt; + } + + return delay; +} + +bool RetryRunner::WaitForNextAttempt(int32_t attempt, + const std::optional& deadline) const { + const auto delay = RetryDelayWithinBudget(attempt, deadline); + if (!delay.has_value()) { + return false; + } + + RetrySleepFor(*delay); + return !HasTimedOut(deadline); +} + +int32_t RetryRunner::CalculateDelay(int32_t attempt) const { + const double base_delay = + config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1); + const int32_t delay_ms = static_cast( + std::min(base_delay, static_cast(config_.max_wait_ms))); + return std::clamp(ApplyRetryJitter(delay_ms), 1, config_.max_wait_ms); +} + +} // namespace iceberg diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h index 7041a40ef..caa8642d1 100644 --- a/src/iceberg/util/retry_util.h +++ b/src/iceberg/util/retry_util.h @@ -19,12 +19,14 @@ #pragma once -#include #include -#include +#include +#include +#include +#include #include -#include -#include +#include +#include #include #include "iceberg/iceberg_export.h" @@ -32,6 +34,27 @@ namespace iceberg { +namespace detail { + +template +struct IsResult : std::false_type {}; + +template +struct IsResult> : std::true_type {}; + +template +concept ResultType = IsResult>::value; + +template +concept RetryTask = requires(F& f) { + { std::invoke(f) } -> ResultType; +}; + +template +using RetryTaskResult = std::remove_cvref_t>; + +} // namespace detail + /// \brief Configuration for retry behavior struct ICEBERG_EXPORT RetryConfig { /// Maximum number of retry attempts (not including the first attempt) @@ -47,6 +70,9 @@ struct ICEBERG_EXPORT RetryConfig { }; /// \brief Utility class for running tasks with retry logic +/// +/// When retries are enabled (`num_retries > 0`), callers must explicitly configure +/// retry policy with `OnlyRetryOn(...)` or `StopRetryOn(...)`. class ICEBERG_EXPORT RetryRunner { public: /// \brief Construct a RetryRunner with the given configuration @@ -60,7 +86,8 @@ class ICEBERG_EXPORT RetryRunner { /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, /// StopRetryOn is ignored. RetryRunner& OnlyRetryOn(std::initializer_list error_kinds) { - only_retry_on_ = std::vector(error_kinds); + retry_policy_mode_ = RetryPolicyMode::kOnlyRetryOn; + retry_error_kinds_ = std::vector(error_kinds); return *this; } @@ -68,10 +95,7 @@ class ICEBERG_EXPORT RetryRunner { /// /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, /// StopRetryOn is ignored. - RetryRunner& OnlyRetryOn(ErrorKind error_kind) { - only_retry_on_ = std::vector{error_kind}; - return *this; - } + RetryRunner& OnlyRetryOn(ErrorKind error_kind) { return OnlyRetryOn({error_kind}); } /// \brief Specify error types that should stop retries immediately. /// @@ -81,18 +105,29 @@ class ICEBERG_EXPORT RetryRunner { /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, /// StopRetryOn is ignored. RetryRunner& StopRetryOn(std::initializer_list error_kinds) { - stop_retry_on_ = std::vector(error_kinds); + if (retry_policy_mode_ == RetryPolicyMode::kOnlyRetryOn) { + return *this; + } + + retry_policy_mode_ = RetryPolicyMode::kStopRetryOn; + retry_error_kinds_ = std::vector(error_kinds); return *this; } /// \brief Run a task that returns a Result /// + /// When `num_retries > 0`, the retry policy must be configured explicitly via + /// `OnlyRetryOn(...)` or `StopRetryOn(...)`. + /// /// TODO: Replace attempt_counter with a metrics reporter once it is available. - template ::value_type> - Result Run(F&& task, int32_t* attempt_counter = nullptr) { - if (config_.num_retries < 0) { - return InvalidArgument("num_retries must be non-negative, got {}", - config_.num_retries); + template + requires detail::RetryTask + auto Run(F&& task, int32_t* attempt_counter = nullptr) -> detail::RetryTaskResult { + using TaskResult = detail::RetryTaskResult; + + const auto validation = ValidateConfig(); + if (!validation.has_value()) { + return TaskResult(std::unexpected(validation.error())); } const auto deadline = ComputeDeadline(); @@ -105,7 +140,7 @@ class ICEBERG_EXPORT RetryRunner { *attempt_counter = attempt; } - auto result = task(); + auto result = std::invoke(task); if (result.has_value()) { return result; } @@ -121,89 +156,34 @@ class ICEBERG_EXPORT RetryRunner { } private: + enum class RetryPolicyMode { + kUnset, + kOnlyRetryOn, + kStopRetryOn, + }; + using Clock = std::chrono::steady_clock; using Duration = std::chrono::milliseconds; using TimePoint = Clock::time_point; - std::optional ComputeDeadline() const { - if (config_.total_timeout_ms <= 0) { - return std::nullopt; - } - return Clock::now() + Duration(config_.total_timeout_ms); - } - - bool HasTimedOut(const std::optional& deadline) const { - return deadline.has_value() && Clock::now() >= *deadline; - } + Status ValidateConfig() const; + std::optional ComputeDeadline() const; + bool HasTimedOut(const std::optional& deadline) const; /// \brief Check if the given error kind should trigger a retry. - bool ShouldRetry(ErrorKind kind) const { - if (!only_retry_on_.empty()) { - return std::ranges::any_of(only_retry_on_, - [kind](ErrorKind k) { return kind == k; }); - } - - if (!stop_retry_on_.empty()) { - return !std::ranges::any_of(stop_retry_on_, - [kind](ErrorKind k) { return kind == k; }); - } - - return true; - } - + bool ShouldRetry(ErrorKind kind) const; bool CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts, - const std::optional& deadline) const { - return attempt < max_attempts && !HasTimedOut(deadline) && ShouldRetry(kind); - } - + const std::optional& deadline) const; std::optional RetryDelayWithinBudget( - int32_t attempt, const std::optional& deadline) const { - const auto delay = Duration(CalculateDelay(attempt)); - if (!deadline.has_value()) { - return delay; - } - - const auto now = Clock::now(); - if (now >= *deadline) { - return std::nullopt; - } - - const auto remaining = std::chrono::duration_cast(*deadline - now); - if (remaining <= Duration::zero() || delay >= remaining) { - return std::nullopt; - } - - return delay; - } - + int32_t attempt, const std::optional& deadline) const; bool WaitForNextAttempt(int32_t attempt, - const std::optional& deadline) const { - const auto delay = RetryDelayWithinBudget(attempt, deadline); - if (!delay.has_value()) { - return false; - } - - std::this_thread::sleep_for(*delay); - return !HasTimedOut(deadline); - } - + const std::optional& deadline) const; /// \brief Calculate delay with exponential backoff and jitter - int32_t CalculateDelay(int32_t attempt) const { - // Calculate base delay with exponential backoff - double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1); - int32_t delay_ms = static_cast( - std::min(base_delay, static_cast(config_.max_wait_ms))); - - static thread_local std::mt19937 gen(std::random_device{}()); - int32_t jitter_range = std::max(1, delay_ms / 10); - std::uniform_int_distribution<> dis(0, jitter_range - 1); - delay_ms += dis(gen); - return std::max(1, delay_ms); - } + int32_t CalculateDelay(int32_t attempt) const; RetryConfig config_; - std::vector only_retry_on_; - std::vector stop_retry_on_; + RetryPolicyMode retry_policy_mode_ = RetryPolicyMode::kUnset; + std::vector retry_error_kinds_; }; /// \brief Helper function to create a RetryRunner with table commit configuration diff --git a/src/iceberg/util/retry_util_internal.h b/src/iceberg/util/retry_util_internal.h new file mode 100644 index 000000000..b8d053b25 --- /dev/null +++ b/src/iceberg/util/retry_util_internal.h @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +namespace iceberg { + +struct RetryTestHooks { + using Clock = std::chrono::steady_clock; + using Duration = std::chrono::milliseconds; + using TimePoint = Clock::time_point; + + std::function now; + std::function sleep_for; + std::function jitter; +}; + +// Keep test hooks thread-local so fake retry timing in one test thread does not +// leak into unrelated retry work or require synchronization around a global pointer. +inline thread_local const RetryTestHooks* active_retry_test_hooks = nullptr; + +class ScopedRetryTestHooks { + public: + explicit ScopedRetryTestHooks(const RetryTestHooks& hooks) + : previous_hooks_(active_retry_test_hooks) { + active_retry_test_hooks = &hooks; + } + + ScopedRetryTestHooks(const ScopedRetryTestHooks&) = delete; + ScopedRetryTestHooks& operator=(const ScopedRetryTestHooks&) = delete; + ScopedRetryTestHooks(ScopedRetryTestHooks&&) = delete; + ScopedRetryTestHooks& operator=(ScopedRetryTestHooks&&) = delete; + + ~ScopedRetryTestHooks() { active_retry_test_hooks = previous_hooks_; } + + private: + const RetryTestHooks* previous_hooks_; +}; + +} // namespace iceberg