Skip to content

Commit

Permalink
Merge pull request grpc#16988 from vjpai/server_callback
Browse files Browse the repository at this point in the history
C++: Experimental server callback unary API
  • Loading branch information
vjpai authored Nov 1, 2018
2 parents 5a8b5e8 + 932abf4 commit d35a7c4
Show file tree
Hide file tree
Showing 33 changed files with 1,378 additions and 124 deletions.
2 changes: 2 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ GRPCXX_PUBLIC_HDRS = [
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
"include/grpcpp/support/server_callback.h",
"include/grpcpp/support/slice.h",
"include/grpcpp/support/status.h",
"include/grpcpp/support/status_code_enum.h",
Expand Down Expand Up @@ -2088,6 +2089,7 @@ grpc_cc_library(
"include/grpcpp/impl/codegen/rpc_service_method.h",
"include/grpcpp/impl/codegen/security/auth_context.h",
"include/grpcpp/impl/codegen/serialization_traits.h",
"include/grpcpp/impl/codegen/server_callback.h",
"include/grpcpp/impl/codegen/server_context.h",
"include/grpcpp/impl/codegen/server_interceptor.h",
"include/grpcpp/impl/codegen/server_interface.h",
Expand Down
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,7 @@ foreach(_hdr
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
include/grpcpp/support/server_callback.h
include/grpcpp/support/slice.h
include/grpcpp/support/status.h
include/grpcpp/support/status_code_enum.h
Expand Down Expand Up @@ -3137,6 +3138,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/rpc_service_method.h
include/grpcpp/impl/codegen/security/auth_context.h
include/grpcpp/impl/codegen/serialization_traits.h
include/grpcpp/impl/codegen/server_callback.h
include/grpcpp/impl/codegen/server_context.h
include/grpcpp/impl/codegen/server_interceptor.h
include/grpcpp/impl/codegen/server_interface.h
Expand Down Expand Up @@ -3600,6 +3602,7 @@ foreach(_hdr
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
include/grpcpp/support/server_callback.h
include/grpcpp/support/slice.h
include/grpcpp/support/status.h
include/grpcpp/support/status_code_enum.h
Expand Down Expand Up @@ -3717,6 +3720,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/rpc_service_method.h
include/grpcpp/impl/codegen/security/auth_context.h
include/grpcpp/impl/codegen/serialization_traits.h
include/grpcpp/impl/codegen/server_callback.h
include/grpcpp/impl/codegen/server_context.h
include/grpcpp/impl/codegen/server_interceptor.h
include/grpcpp/impl/codegen/server_interface.h
Expand Down Expand Up @@ -4131,6 +4135,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/rpc_service_method.h
include/grpcpp/impl/codegen/security/auth_context.h
include/grpcpp/impl/codegen/serialization_traits.h
include/grpcpp/impl/codegen/server_callback.h
include/grpcpp/impl/codegen/server_context.h
include/grpcpp/impl/codegen/server_interceptor.h
include/grpcpp/impl/codegen/server_interface.h
Expand Down Expand Up @@ -4317,6 +4322,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/rpc_service_method.h
include/grpcpp/impl/codegen/security/auth_context.h
include/grpcpp/impl/codegen/serialization_traits.h
include/grpcpp/impl/codegen/server_callback.h
include/grpcpp/impl/codegen/server_context.h
include/grpcpp/impl/codegen/server_interceptor.h
include/grpcpp/impl/codegen/server_interface.h
Expand Down Expand Up @@ -4530,6 +4536,7 @@ foreach(_hdr
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
include/grpcpp/support/server_callback.h
include/grpcpp/support/slice.h
include/grpcpp/support/status.h
include/grpcpp/support/status_code_enum.h
Expand Down Expand Up @@ -4647,6 +4654,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/rpc_service_method.h
include/grpcpp/impl/codegen/security/auth_context.h
include/grpcpp/impl/codegen/serialization_traits.h
include/grpcpp/impl/codegen/server_callback.h
include/grpcpp/impl/codegen/server_context.h
include/grpcpp/impl/codegen/server_interceptor.h
include/grpcpp/impl/codegen/server_interface.h
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5371,6 +5371,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
include/grpcpp/support/server_callback.h \
include/grpcpp/support/slice.h \
include/grpcpp/support/status.h \
include/grpcpp/support/status_code_enum.h \
Expand Down Expand Up @@ -5488,6 +5489,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/rpc_service_method.h \
include/grpcpp/impl/codegen/security/auth_context.h \
include/grpcpp/impl/codegen/serialization_traits.h \
include/grpcpp/impl/codegen/server_callback.h \
include/grpcpp/impl/codegen/server_context.h \
include/grpcpp/impl/codegen/server_interceptor.h \
include/grpcpp/impl/codegen/server_interface.h \
Expand Down Expand Up @@ -5960,6 +5962,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
include/grpcpp/support/server_callback.h \
include/grpcpp/support/slice.h \
include/grpcpp/support/status.h \
include/grpcpp/support/status_code_enum.h \
Expand Down Expand Up @@ -6077,6 +6080,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/rpc_service_method.h \
include/grpcpp/impl/codegen/security/auth_context.h \
include/grpcpp/impl/codegen/serialization_traits.h \
include/grpcpp/impl/codegen/server_callback.h \
include/grpcpp/impl/codegen/server_context.h \
include/grpcpp/impl/codegen/server_interceptor.h \
include/grpcpp/impl/codegen/server_interface.h \
Expand Down Expand Up @@ -6476,6 +6480,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/rpc_service_method.h \
include/grpcpp/impl/codegen/security/auth_context.h \
include/grpcpp/impl/codegen/serialization_traits.h \
include/grpcpp/impl/codegen/server_callback.h \
include/grpcpp/impl/codegen/server_context.h \
include/grpcpp/impl/codegen/server_interceptor.h \
include/grpcpp/impl/codegen/server_interface.h \
Expand Down Expand Up @@ -6639,6 +6644,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/rpc_service_method.h \
include/grpcpp/impl/codegen/security/auth_context.h \
include/grpcpp/impl/codegen/serialization_traits.h \
include/grpcpp/impl/codegen/server_callback.h \
include/grpcpp/impl/codegen/server_context.h \
include/grpcpp/impl/codegen/server_interceptor.h \
include/grpcpp/impl/codegen/server_interface.h \
Expand Down Expand Up @@ -6857,6 +6863,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
include/grpcpp/support/server_callback.h \
include/grpcpp/support/slice.h \
include/grpcpp/support/status.h \
include/grpcpp/support/status_code_enum.h \
Expand Down Expand Up @@ -6974,6 +6981,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/rpc_service_method.h \
include/grpcpp/impl/codegen/security/auth_context.h \
include/grpcpp/impl/codegen/serialization_traits.h \
include/grpcpp/impl/codegen/server_callback.h \
include/grpcpp/impl/codegen/server_context.h \
include/grpcpp/impl/codegen/server_interceptor.h \
include/grpcpp/impl/codegen/server_interface.h \
Expand Down
2 changes: 2 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,7 @@ filegroups:
- include/grpcpp/impl/codegen/rpc_service_method.h
- include/grpcpp/impl/codegen/security/auth_context.h
- include/grpcpp/impl/codegen/serialization_traits.h
- include/grpcpp/impl/codegen/server_callback.h
- include/grpcpp/impl/codegen/server_context.h
- include/grpcpp/impl/codegen/server_interceptor.h
- include/grpcpp/impl/codegen/server_interface.h
Expand Down Expand Up @@ -1363,6 +1364,7 @@ filegroups:
- include/grpcpp/support/config.h
- include/grpcpp/support/proto_buffer_reader.h
- include/grpcpp/support/proto_buffer_writer.h
- include/grpcpp/support/server_callback.h
- include/grpcpp/support/slice.h
- include/grpcpp/support/status.h
- include/grpcpp/support/status_code_enum.h
Expand Down
2 changes: 2 additions & 0 deletions gRPC-C++.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Pod::Spec.new do |s|
'include/grpcpp/support/config.h',
'include/grpcpp/support/proto_buffer_reader.h',
'include/grpcpp/support/proto_buffer_writer.h',
'include/grpcpp/support/server_callback.h',
'include/grpcpp/support/slice.h',
'include/grpcpp/support/status.h',
'include/grpcpp/support/status_code_enum.h',
Expand Down Expand Up @@ -154,6 +155,7 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/codegen/rpc_service_method.h',
'include/grpcpp/impl/codegen/security/auth_context.h',
'include/grpcpp/impl/codegen/serialization_traits.h',
'include/grpcpp/impl/codegen/server_callback.h',
'include/grpcpp/impl/codegen/server_context.h',
'include/grpcpp/impl/codegen/server_interceptor.h',
'include/grpcpp/impl/codegen/server_interface.h',
Expand Down
2 changes: 1 addition & 1 deletion include/grpcpp/impl/codegen/async_unary_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class ServerAsyncResponseWriter final
/// metadata.
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.set_output_tag(tag);
finish_buf_.set_cq_tag(&finish_buf_);
finish_buf_.set_core_cq_tag(&finish_buf_);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
Expand Down
4 changes: 4 additions & 0 deletions include/grpcpp/impl/codegen/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class CallbackUnaryHandler;
template <StatusCode code>
class ErrorMethodHandler;
template <class R>
Expand Down Expand Up @@ -154,6 +156,8 @@ class ByteBuffer final {
friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class internal::ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class internal::CallbackUnaryHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
template <class R>
Expand Down
22 changes: 11 additions & 11 deletions include/grpcpp/impl/codegen/call_op_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -770,19 +770,19 @@ class CallOpSet : public CallOpSetInterface,
public Op5,
public Op6 {
public:
CallOpSet() : cq_tag_(this), return_tag_(this) {}
CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
// The copy constructor and assignment operator reset the value of
// cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ since
// those are only meaningful on a specific object, not across objects.
// core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
// since those are only meaningful on a specific object, not across objects.
CallOpSet(const CallOpSet& other)
: cq_tag_(this),
: core_cq_tag_(this),
return_tag_(this),
call_(other.call_),
done_intercepting_(false),
interceptor_methods_(InterceptorBatchMethodsImpl()) {}

CallOpSet& operator=(const CallOpSet& other) {
cq_tag_ = this;
core_cq_tag_ = this;
return_tag_ = this;
call_ = other.call_;
done_intercepting_ = false;
Expand Down Expand Up @@ -834,13 +834,13 @@ class CallOpSet : public CallOpSetInterface,

void set_output_tag(void* return_tag) { return_tag_ = return_tag; }

void* cq_tag() override { return cq_tag_; }
void* core_cq_tag() override { return core_cq_tag_; }

/// set_cq_tag is used to provide a different core CQ tag than "this".
/// set_core_cq_tag is used to provide a different core CQ tag than "this".
/// This is used for callback-based tags, where the core tag is the core
/// callback function. It does not change the use or behavior of any other
/// function (such as FinalizeResult)
void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }

// This will be called while interceptors are run if the RPC is a hijacked
// RPC. This should set hijacking state for each of the ops.
Expand All @@ -866,7 +866,7 @@ class CallOpSet : public CallOpSetInterface,
this->Op6::AddOp(ops, &nops);
GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
g_core_codegen_interface->grpc_call_start_batch(
call_.call(), ops, nops, cq_tag(), nullptr));
call_.call(), ops, nops, core_cq_tag(), nullptr));
}

// Should be called after interceptors are done running on the finalize result
Expand All @@ -875,7 +875,7 @@ class CallOpSet : public CallOpSetInterface,
done_intercepting_ = true;
GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
g_core_codegen_interface->grpc_call_start_batch(
call_.call(), nullptr, 0, cq_tag(), nullptr));
call_.call(), nullptr, 0, core_cq_tag(), nullptr));
}

private:
Expand Down Expand Up @@ -906,7 +906,7 @@ class CallOpSet : public CallOpSetInterface,
return interceptor_methods_.RunInterceptors();
}

void* cq_tag_;
void* core_cq_tag_;
void* return_tag_;
Call call_;
bool done_intercepting_ = false;
Expand Down
4 changes: 2 additions & 2 deletions include/grpcpp/impl/codegen/call_op_set_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class CallOpSetInterface : public CompletionQueueTag {
virtual void FillOps(internal::Call* call) = 0;

/// Get the tag to be used at the core completion queue. Generally, the
/// value of cq_tag will be "this". However, it can be overridden if we
/// value of core_cq_tag will be "this". However, it can be overridden if we
/// want core to process the tag differently (e.g., as a core callback)
virtual void* cq_tag() = 0;
virtual void* core_cq_tag() = 0;

// This will be called while interceptors are run if the RPC is a hijacked
// RPC. This should set hijacking state for each of the ops.
Expand Down
26 changes: 19 additions & 7 deletions include/grpcpp/impl/codegen/callback_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ class CallbackWithStatusTag
GPR_CODEGEN_ASSERT(ignored == ops_);

// Last use of func_ or status_, so ok to move them out
CatchingCallback(std::move(func_), std::move(status_));

auto func = std::move(func_);
auto status = std::move(status_);
func_ = nullptr; // reset to clear this out for sure
status_ = Status(); // reset to clear this out for sure
CatchingCallback(std::move(func), std::move(status));
g_core_codegen_interface->grpc_call_unref(call_);
}
};
Expand All @@ -124,6 +125,8 @@ class CallbackWithSuccessTag
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }

CallbackWithSuccessTag() : call_(nullptr), ops_(nullptr) {}

CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops)
: call_(call), func_(std::move(f)), ops_(ops) {
Expand All @@ -138,6 +141,9 @@ class CallbackWithSuccessTag
// that are detected before the operations are internally processed.
void force_run(bool ok) { Run(ok); }

/// check if this tag has ever been set
operator bool() const { return call_ != nullptr; }

private:
grpc_call* call_;
std::function<void(bool)> func_;
Expand All @@ -150,13 +156,19 @@ class CallbackWithSuccessTag
void Run(bool ok) {
void* ignored = ops_;
bool new_ok = ok;
GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok));
// Allow a "false" return value from FinalizeResult to silence the
// callback, just as it silences a CQ tag in the async cases
bool do_callback = ops_->FinalizeResult(&ignored, &new_ok);
GPR_CODEGEN_ASSERT(ignored == ops_);

// Last use of func_, so ok to move it out for rvalue call above
CatchingCallback(std::move(func_), ok);

func_ = nullptr; // reset to clear this out for sure
if (do_callback) {
// Last use of func_, so ok to move it out for rvalue call above
auto func = std::move(func_);
func_ = nullptr; // reset to clear this out for sure
CatchingCallback(std::move(func), ok);
} else {
func_ = nullptr; // reset to clear this out for sure
}
g_core_codegen_interface->grpc_call_unref(call_);
}
};
Expand Down
2 changes: 1 addition & 1 deletion include/grpcpp/impl/codegen/channel_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ChannelInterface {
// channel. If the return value is nullptr, this channel doesn't support
// callback operations.
// TODO(vjpai): Consider a better default like using a global CQ
// Returns nullptr (rather than being pure) since this is a new method
// Returns nullptr (rather than being pure) since this is a post-1.0 method
// and adding a new pure method to an interface would be a breaking change
// (even though this is private and non-API)
virtual CompletionQueue* CallbackCQ() { return nullptr; }
Expand Down
2 changes: 1 addition & 1 deletion include/grpcpp/impl/codegen/client_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl {
ops->AllowNoMessage();
ops->ClientSendClose();
ops->ClientRecvStatus(context, tag->status_ptr());
ops->set_cq_tag(tag);
ops->set_core_cq_tag(tag);
call.PerformOps(ops);
}
};
Expand Down
10 changes: 8 additions & 2 deletions include/grpcpp/impl/codegen/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,18 @@ class ServerCompletionQueue : public CompletionQueue {
ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {}

private:
/// \param completion_type indicates whether this is a NEXT or CALLBACK
/// completion queue.
/// \param polling_type Informs the GRPC library about the type of polling
/// allowed on this completion queue. See grpc_cq_polling_type's description
/// in grpc_types.h for more details.
ServerCompletionQueue(grpc_cq_polling_type polling_type)
/// \param shutdown_cb is the shutdown callback used for CALLBACK api queues
ServerCompletionQueue(grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type,
grpc_experimental_completion_queue_functor* shutdown_cb)
: CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type, nullptr}),
GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
shutdown_cb}),
polling_type_(polling_type) {}

grpc_cq_polling_type polling_type_;
Expand Down
Loading

0 comments on commit d35a7c4

Please sign in to comment.