diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h index 86356909afd6c..62a31c5bd1978 100644 --- a/include/grpcpp/alarm.h +++ b/include/grpcpp/alarm.h @@ -21,7 +21,6 @@ #ifndef GRPCPP_ALARM_H #define GRPCPP_ALARM_H -#include #include #include #include @@ -32,20 +31,9 @@ namespace grpc { -/// Trigger a \a CompletionQueue event, or asynchronous callback execution, -/// after some deadline. -/// -/// The \a Alarm API has separate \a Set methods for CompletionQueues and -/// callbacks, but only one can be used at any given time. After an alarm has -/// been triggered or cancelled, the same Alarm object may reused. -/// -/// Alarm methods are not thread-safe. Applications must ensure a strict -/// ordering between calls to \a Set and \a Cancel. This also implies that any -/// cancellation that occurs before the alarm has been set will have no effect -/// on any future \a Set calls. class Alarm : private grpc::internal::GrpcLibrary { public: - /// Create an unset Alarm. + /// Create an unset completion queue alarm Alarm(); /// Destroy the given completion queue alarm, cancelling it in the process. diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 38e0887e5e48f..2c436b2344de3 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -497,10 +497,6 @@ void PosixEventEngine::Run(EventEngine::Closure* closure) { EventEngine::TaskHandle PosixEventEngine::RunAfterInternal( Duration when, absl::AnyInvocable cb) { - if (when <= Duration::zero()) { - Run(std::move(cb)); - return TaskHandle::kInvalid; - } auto when_ts = ToTimestamp(timer_manager_->Now(), when); auto* cd = new ClosureData; cd->cb = std::move(cb); diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 7617ca64670d0..ad8073007b986 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -15,7 +15,6 @@ // // -#include #include #include #include @@ -24,34 +23,26 @@ #include #include -#include #include -#include #include -#include "absl/log/check.h" -#include "absl/status/status.h" -#include "src/core/lib/event_engine/default_event_engine.h" +// #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" #include "src/core/util/time.h" namespace grpc { namespace internal { - -namespace { -using grpc_event_engine::experimental::EventEngine; -} // namespace - class AlarmImpl : public grpc::internal::CompletionQueueTag { public: - AlarmImpl() - : event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()), - cq_(nullptr), - tag_(nullptr) { + AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); + grpc_timer_init_unset(&timer_); } ~AlarmImpl() override {} bool FinalizeResult(void** tag, bool* /*status*/) override { @@ -60,41 +51,61 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { return true; } void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); cq_ = cq->cq(); tag_ = tag; CHECK(grpc_cq_begin_op(cq_, this)); - Ref(); - CHECK(cq_armed_.exchange(true) == false); - CHECK(!callback_armed_.load()); - cq_timer_handle_ = event_engine_->RunAfter( - grpc_core::Timestamp::FromTimespecRoundUp(deadline) - - grpc_core::ExecCtx::Get()->Now(), - [this] { OnCQAlarm(absl::OkStatus()); }); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error_handle error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast(arg); + alarm->Ref(); + // Preserve the cq and reset the cq_ so that the alarm + // can be reset when the alarm tag is delivered. + grpc_completion_queue* cq = alarm->cq_; + alarm->cq_ = nullptr; + grpc_cq_end_op( + cq, alarm, error, + [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg, + &alarm->completion_); + GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, + grpc_core::Timestamp::FromTimespecRoundUp(deadline), + &on_alarm_); } void Set(gpr_timespec deadline, std::function f) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; // Don't use any CQ at all. Instead just use the timer to fire the function callback_ = std::move(f); Ref(); - CHECK(callback_armed_.exchange(true) == false); - CHECK(!cq_armed_.load()); - callback_timer_handle_ = event_engine_->RunAfter( - grpc_core::Timestamp::FromTimespecRoundUp(deadline) - - grpc_core::ExecCtx::Get()->Now(), - [this] { OnCallbackAlarm(true); }); + GRPC_CLOSURE_INIT( + &on_alarm_, + [](void* arg, grpc_error_handle error) { + grpc_core::Executor::Run(GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error_handle error) { + AlarmImpl* alarm = + static_cast(arg); + alarm->callback_(error.ok()); + alarm->Unref(); + }, + arg, nullptr), + error); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, + grpc_core::Timestamp::FromTimespecRoundUp(deadline), + &on_alarm_); } void Cancel() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - if (callback_armed_.load() && - event_engine_->Cancel(callback_timer_handle_)) { - event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); }); - } - if (cq_armed_.load() && event_engine_->Cancel(cq_timer_handle_)) { - event_engine_->Run( - [this] { OnCQAlarm(absl::CancelledError("cancelled")); }); - } + grpc_timer_cancel(&timer_); } void Destroy() { Cancel(); @@ -102,29 +113,6 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { } private: - void OnCQAlarm(grpc_error_handle error) { - cq_armed_.store(false); - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - // Preserve the cq and reset the cq_ so that the alarm - // can be reset when the alarm tag is delivered. - grpc_completion_queue* cq = cq_; - cq_ = nullptr; - grpc_cq_end_op( - cq, this, error, - [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr, - &completion_); - GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); - } - - void OnCallbackAlarm(bool is_ok) { - callback_armed_.store(false); - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - callback_(is_ok); - Unref(); - } - void Ref() { gpr_ref(&refs_); } void Unref() { if (gpr_unref(&refs_)) { @@ -132,13 +120,9 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag { } } - std::shared_ptr event_engine_; - std::atomic cq_armed_{false}; - EventEngine::TaskHandle cq_timer_handle_ = EventEngine::TaskHandle::kInvalid; - std::atomic callback_armed_{false}; - EventEngine::TaskHandle callback_timer_handle_ = - EventEngine::TaskHandle::kInvalid; + grpc_timer timer_; gpr_refcount refs_; + grpc_closure on_alarm_; grpc_cq_completion completion_; // completion queue where events about this alarm will be posted grpc_completion_queue* cq_; diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_test.cc index 15161fa2b3119..1a12de0025336 100644 --- a/test/cpp/common/alarm_test.cc +++ b/test/cpp/common/alarm_test.cc @@ -25,6 +25,7 @@ #include #include +#include "absl/log/log.h" #include "src/core/util/notification.h" #include "test/core/test_util/test_config.h" @@ -307,30 +308,6 @@ TEST(AlarmTest, Cancellation) { EXPECT_EQ(junk, output_tag); } -TEST(AlarmTest, CancellationMultiSet) { - // Tests the cancellation and re-Set paths together. - CompletionQueue cq; - void* junk = reinterpret_cast(1618033); - Alarm alarm; - // First iteration - alarm.Set(&cq, grpc_timeout_seconds_to_deadline(5), junk); - alarm.Cancel(); - void* output_tag; - bool ok; - CompletionQueue::NextStatus status = - cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10)); - EXPECT_EQ(status, CompletionQueue::GOT_EVENT); - EXPECT_FALSE(ok); - EXPECT_EQ(junk, output_tag); - // Second iteration - alarm.Set(&cq, grpc_timeout_seconds_to_deadline(5), junk); - alarm.Cancel(); - status = cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10)); - EXPECT_EQ(status, CompletionQueue::GOT_EVENT); - EXPECT_FALSE(ok); - EXPECT_EQ(junk, output_tag); -} - TEST(AlarmTest, CallbackCancellation) { Alarm alarm; @@ -350,33 +327,6 @@ TEST(AlarmTest, CallbackCancellation) { [c] { return c->completed; })); } -TEST(AlarmTest, CallbackCancellationMultiSet) { - // Tests the cancellation and re-Set paths. - Alarm alarm; - // First iteration - { - grpc_core::Notification notification; - alarm.Set(std::chrono::system_clock::now() + std::chrono::seconds(10), - [¬ification](bool ok) { - EXPECT_FALSE(ok); - notification.Notify(); - }); - alarm.Cancel(); - notification.WaitForNotification(); - } - // First iteration - { - grpc_core::Notification notification; - alarm.Set(std::chrono::system_clock::now() + std::chrono::seconds(10), - [¬ification](bool ok) { - EXPECT_FALSE(ok); - notification.Notify(); - }); - alarm.Cancel(); - notification.WaitForNotification(); - } -} - TEST(AlarmTest, CallbackCancellationLocked) { Alarm alarm; @@ -438,6 +388,23 @@ TEST(AlarmTest, UnsetDestruction) { Alarm alarm; } +TEST(AlarmTest, AlarmReuse) { + Alarm alarm; + CompletionQueue cq; + std::thread polling_thread([&]() { + void* tag; + bool ok = false; + while (cq.Next(&tag, &ok)) { + LOG(INFO) << "ok: " << ok; + } + }); + while (true) { + alarm.Set(&cq, gpr_timespec{0, 0, GPR_TIMESPAN}, nullptr); + alarm.Cancel(); + } + polling_thread.join(); +} + } // namespace } // namespace grpc