Skip to content

Commit 08cad4c

Browse files
sampan-s-nayaksampanedoakes
authored
[Core] Support token auth in ray Pub-Sub (#58333)
This PR adds token-based authentication support to the PythonGcsSubscriber, which previously made direct gRPC calls via the stub without auth. The rest of the pub-sub layer already uses the shared gRPC infrastructure (GrpcServer, GrpcClient), which supports token authentication. --------- Signed-off-by: sampan <[email protected]> Signed-off-by: Edward Oakes <[email protected]> Co-authored-by: sampan <[email protected]> Co-authored-by: Edward Oakes <[email protected]>
1 parent 3658f76 commit 08cad4c

File tree

6 files changed

+389
-5
lines changed

6 files changed

+389
-5
lines changed

src/ray/pubsub/python_gcs_subscriber.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <vector>
2323

2424
#include "ray/gcs_rpc_client/rpc_client.h"
25+
#include "ray/rpc/authentication/authentication_token_loader.h"
2526

2627
namespace ray {
2728
namespace pubsub {
@@ -51,6 +52,7 @@ Status PythonGcsSubscriber::Subscribe() {
5152
}
5253

5354
grpc::ClientContext context;
55+
SetAuthenticationToken(context);
5456

5557
rpc::GcsSubscriberCommandBatchRequest request;
5658
request.set_subscriber_id(subscriber_id_);
@@ -78,6 +80,7 @@ Status PythonGcsSubscriber::DoPoll(int64_t timeout_ms, rpc::PubMessage *message)
7880
return Status::OK();
7981
}
8082
current_polling_context_ = std::make_shared<grpc::ClientContext>();
83+
SetAuthenticationToken(*current_polling_context_);
8184
if (timeout_ms != -1) {
8285
current_polling_context_->set_deadline(std::chrono::system_clock::now() +
8386
std::chrono::milliseconds(timeout_ms));
@@ -173,6 +176,7 @@ Status PythonGcsSubscriber::Close() {
173176
}
174177

175178
grpc::ClientContext context;
179+
SetAuthenticationToken(context);
176180

177181
rpc::GcsSubscriberCommandBatchRequest request;
178182
request.set_subscriber_id(subscriber_id_);
@@ -195,5 +199,12 @@ int64_t PythonGcsSubscriber::last_batch_size() {
195199
return last_batch_size_;
196200
}
197201

202+
void PythonGcsSubscriber::SetAuthenticationToken(grpc::ClientContext &context) {
203+
auto auth_token = ray::rpc::AuthenticationTokenLoader::instance().GetToken();
204+
if (auth_token.has_value() && !auth_token->empty()) {
205+
auth_token->SetMetadata(context);
206+
}
207+
}
208+
198209
} // namespace pubsub
199210
} // namespace ray

src/ray/pubsub/python_gcs_subscriber.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ class RAY_EXPORT PythonGcsSubscriber {
8080
std::deque<rpc::PubMessage> queue_ ABSL_GUARDED_BY(mu_);
8181
bool closed_ ABSL_GUARDED_BY(mu_) = false;
8282
std::shared_ptr<grpc::ClientContext> current_polling_context_ ABSL_GUARDED_BY(mu_);
83+
84+
// Set authentication token on a gRPC client context if token-based authentication is
85+
// enabled
86+
void SetAuthenticationToken(grpc::ClientContext &context);
8387
};
8488

8589
/// Get the .lines() attribute of a LogBatch as a std::vector

src/ray/pubsub/tests/BUILD.bazel

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,20 @@ ray_cc_test(
4040
"@com_google_googletest//:gtest_main",
4141
],
4242
)
43+
44+
ray_cc_test(
45+
name = "python_gcs_subscriber_auth_test",
46+
size = "small",
47+
srcs = ["python_gcs_subscriber_auth_test.cc"],
48+
tags = ["team:core"],
49+
deps = [
50+
"//src/ray/common:ray_config",
51+
"//src/ray/common:status",
52+
"//src/ray/protobuf:gcs_service_cc_grpc",
53+
"//src/ray/pubsub:python_gcs_subscriber",
54+
"//src/ray/rpc:grpc_server",
55+
"//src/ray/rpc/authentication:authentication_token",
56+
"//src/ray/rpc/authentication:authentication_token_loader",
57+
"@com_google_googletest//:gtest_main",
58+
],
59+
)

0 commit comments

Comments
 (0)