Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions include/grpcpp/alarm.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#ifndef GRPCPP_ALARM_H
#define GRPCPP_ALARM_H

#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/completion_queue_tag.h>
Expand All @@ -32,20 +31,9 @@

namespace grpc {

/// Trigger a \a CompletionQueue event, or asynchronous callback execution,
/// after some deadline.
///
/// The \a Alarm API has separate \a Set methods for CompletionQueues and
/// callbacks, but only one can be used at any given time. After an alarm has
/// been triggered or cancelled, the same Alarm object may reused.
///
/// Alarm methods are not thread-safe. Applications must ensure a strict
/// ordering between calls to \a Set and \a Cancel. This also implies that any
/// cancellation that occurs before the alarm has been set will have no effect
/// on any future \a Set calls.
class Alarm : private grpc::internal::GrpcLibrary {
public:
/// Create an unset Alarm.
/// Create an unset completion queue alarm
Alarm();

/// Destroy the given completion queue alarm, cancelling it in the process.
Expand Down
4 changes: 0 additions & 4 deletions src/core/lib/event_engine/posix_engine/posix_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,6 @@ void PosixEventEngine::Run(EventEngine::Closure* closure) {

EventEngine::TaskHandle PosixEventEngine::RunAfterInternal(
Duration when, absl::AnyInvocable<void()> cb) {
if (when <= Duration::zero()) {
Run(std::move(cb));
return TaskHandle::kInvalid;
}
auto when_ts = ToTimestamp(timer_manager_->Now(), when);
auto* cd = new ClosureData;
cd->cb = std::move(cb);
Expand Down
114 changes: 49 additions & 65 deletions src/cpp/common/alarm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//
//

#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
Expand All @@ -24,34 +23,26 @@
#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/completion_queue_tag.h>

#include <atomic>
#include <functional>
#include <memory>
#include <utility>

#include "absl/log/check.h"
#include "absl/status/status.h"
#include "src/core/lib/event_engine/default_event_engine.h"
// #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/util/time.h"

namespace grpc {

namespace internal {

namespace {
using grpc_event_engine::experimental::EventEngine;
} // namespace

class AlarmImpl : public grpc::internal::CompletionQueueTag {
public:
AlarmImpl()
: event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()),
cq_(nullptr),
tag_(nullptr) {
AlarmImpl() : cq_(nullptr), tag_(nullptr) {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
}
~AlarmImpl() override {}
bool FinalizeResult(void** tag, bool* /*status*/) override {
Expand All @@ -60,85 +51,78 @@ class AlarmImpl : public grpc::internal::CompletionQueueTag {
return true;
}
void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
cq_ = cq->cq();
tag_ = tag;
CHECK(grpc_cq_begin_op(cq_, this));
Ref();
CHECK(cq_armed_.exchange(true) == false);
CHECK(!callback_armed_.load());
cq_timer_handle_ = event_engine_->RunAfter(
grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
grpc_core::ExecCtx::Get()->Now(),
[this] { OnCQAlarm(absl::OkStatus()); });
GRPC_CLOSURE_INIT(
&on_alarm_,
[](void* arg, grpc_error_handle error) {
// queue the op on the completion queue
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->Ref();
// Preserve the cq and reset the cq_ so that the alarm
// can be reset when the alarm tag is delivered.
grpc_completion_queue* cq = alarm->cq_;
alarm->cq_ = nullptr;
grpc_cq_end_op(
cq, alarm, error,
[](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg,
&alarm->completion_);
GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_,
grpc_core::Timestamp::FromTimespecRoundUp(deadline),
&on_alarm_);
}
void Set(gpr_timespec deadline, std::function<void(bool)> f) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std::move(f);
Ref();
CHECK(callback_armed_.exchange(true) == false);
CHECK(!cq_armed_.load());
callback_timer_handle_ = event_engine_->RunAfter(
grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
grpc_core::ExecCtx::Get()->Now(),
[this] { OnCallbackAlarm(true); });
GRPC_CLOSURE_INIT(
&on_alarm_,
[](void* arg, grpc_error_handle error) {
grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle error) {
AlarmImpl* alarm =
static_cast<AlarmImpl*>(arg);
alarm->callback_(error.ok());
alarm->Unref();
},
arg, nullptr),
error);
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_,
grpc_core::Timestamp::FromTimespecRoundUp(deadline),
&on_alarm_);
}
void Cancel() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
if (callback_armed_.load() &&
event_engine_->Cancel(callback_timer_handle_)) {
event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); });
}
if (cq_armed_.load() && event_engine_->Cancel(cq_timer_handle_)) {
event_engine_->Run(
[this] { OnCQAlarm(absl::CancelledError("cancelled")); });
}
grpc_timer_cancel(&timer_);
}
void Destroy() {
Cancel();
Unref();
}

private:
void OnCQAlarm(grpc_error_handle error) {
cq_armed_.store(false);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// Preserve the cq and reset the cq_ so that the alarm
// can be reset when the alarm tag is delivered.
grpc_completion_queue* cq = cq_;
cq_ = nullptr;
grpc_cq_end_op(
cq, this, error,
[](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr,
&completion_);
GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
}

void OnCallbackAlarm(bool is_ok) {
callback_armed_.store(false);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
callback_(is_ok);
Unref();
}

void Ref() { gpr_ref(&refs_); }
void Unref() {
if (gpr_unref(&refs_)) {
delete this;
}
}

std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
std::atomic<bool> cq_armed_{false};
EventEngine::TaskHandle cq_timer_handle_ = EventEngine::TaskHandle::kInvalid;
std::atomic<bool> callback_armed_{false};
EventEngine::TaskHandle callback_timer_handle_ =
EventEngine::TaskHandle::kInvalid;
grpc_timer timer_;
gpr_refcount refs_;
grpc_closure on_alarm_;
grpc_cq_completion completion_;
// completion queue where events about this alarm will be posted
grpc_completion_queue* cq_;
Expand Down
69 changes: 18 additions & 51 deletions test/cpp/common/alarm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <mutex>
#include <thread>

#include "absl/log/log.h"
#include "src/core/util/notification.h"
#include "test/core/test_util/test_config.h"

Expand Down Expand Up @@ -307,30 +308,6 @@ TEST(AlarmTest, Cancellation) {
EXPECT_EQ(junk, output_tag);
}

TEST(AlarmTest, CancellationMultiSet) {
// Tests the cancellation and re-Set paths together.
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
Alarm alarm;
// First iteration
alarm.Set(&cq, grpc_timeout_seconds_to_deadline(5), junk);
alarm.Cancel();
void* output_tag;
bool ok;
CompletionQueue::NextStatus status =
cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_FALSE(ok);
EXPECT_EQ(junk, output_tag);
// Second iteration
alarm.Set(&cq, grpc_timeout_seconds_to_deadline(5), junk);
alarm.Cancel();
status = cq.AsyncNext(&output_tag, &ok, grpc_timeout_seconds_to_deadline(10));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_FALSE(ok);
EXPECT_EQ(junk, output_tag);
}

TEST(AlarmTest, CallbackCancellation) {
Alarm alarm;

Expand All @@ -350,33 +327,6 @@ TEST(AlarmTest, CallbackCancellation) {
[c] { return c->completed; }));
}

TEST(AlarmTest, CallbackCancellationMultiSet) {
// Tests the cancellation and re-Set paths.
Alarm alarm;
// First iteration
{
grpc_core::Notification notification;
alarm.Set(std::chrono::system_clock::now() + std::chrono::seconds(10),
[&notification](bool ok) {
EXPECT_FALSE(ok);
notification.Notify();
});
alarm.Cancel();
notification.WaitForNotification();
}
// First iteration
{
grpc_core::Notification notification;
alarm.Set(std::chrono::system_clock::now() + std::chrono::seconds(10),
[&notification](bool ok) {
EXPECT_FALSE(ok);
notification.Notify();
});
alarm.Cancel();
notification.WaitForNotification();
}
}

TEST(AlarmTest, CallbackCancellationLocked) {
Alarm alarm;

Expand Down Expand Up @@ -438,6 +388,23 @@ TEST(AlarmTest, UnsetDestruction) {
Alarm alarm;
}

TEST(AlarmTest, AlarmReuse) {
Alarm alarm;
CompletionQueue cq;
std::thread polling_thread([&]() {
void* tag;
bool ok = false;
while (cq.Next(&tag, &ok)) {
LOG(INFO) << "ok: " << ok;
}
});
while (true) {
alarm.Set(&cq, gpr_timespec{0, 0, GPR_TIMESPAN}, nullptr);
alarm.Cancel();
}
polling_thread.join();
}

} // namespace
} // namespace grpc

Expand Down