Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ message MetadataOptions {

// Describes which typed or untyped filter dynamic metadata namespaces to accept from
// the external processing server. Set to empty or leave unset to disallow writing
// any received dynamic metadata. Receiving of typed metadata is not supported.
// any received dynamic metadata.
MetadataNamespaces receiving_namespaces = 2;

// Describes which cluster metadata namespaces to forward to
Expand Down
11 changes: 10 additions & 1 deletion api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "envoy/config/core/v3/base.proto";
import "envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto";
import "envoy/type/v3/http_status.proto";

import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

Expand Down Expand Up @@ -159,7 +160,7 @@ message ProcessingRequest {
// the server must send back exactly one ``ProcessingResponse`` message.
// * If it is set to ``FULL_DUPLEX_STREAMED``, the server must follow the API defined
// for this mode to send the ``ProcessingResponse`` messages.
// [#next-free-field: 13]
// [#next-free-field: 14]
message ProcessingResponse {
// The response type that is sent by the server.
oneof response {
Expand Down Expand Up @@ -220,6 +221,14 @@ message ProcessingResponse {
// field name(s) of the struct.
google.protobuf.Struct dynamic_metadata = 8;

// Optional typed metadata that will be emitted as dynamic metadata to be consumed by
// following filters. This metadata will be placed in the namespace(s) specified by the
// keys of the map.
//
// Typed dynamic metadata should be preferred over untyped dynamic metadata (``dynamic_metadata``)
// because it is more efficient and more type-safe.
map<string, google.protobuf.Any> typed_dynamic_metadata = 13;

// Override how parts of the HTTP request and response are processed for the duration of this
// particular request/response only. Servers may use this to intelligently control how requests
// are processed based on the headers and other metadata that they see.
Expand Down
95 changes: 72 additions & 23 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ initUntypedReceivingNamespaces(const ExtProcPerRoute& config) {

return {initNamespaces(config.overrides().metadata_options().receiving_namespaces().untyped())};
}
std::optional<std::vector<std::string>>
initTypedReceivingNamespaces(const ExtProcPerRoute& config) {
if (!config.has_overrides() || !config.overrides().has_metadata_options() ||
!config.overrides().metadata_options().has_receiving_namespaces()) {
return std::nullopt;
}

return {initNamespaces(config.overrides().metadata_options().receiving_namespaces().typed())};
}

std::optional<std::vector<std::string>>
initUntypedClusterMetadataForwardingNamespaces(const ExtProcPerRoute& config) {
if (!config.has_overrides() || !config.overrides().has_metadata_options() ||
Expand Down Expand Up @@ -268,6 +278,8 @@ FilterConfig::FilterConfig(const ExternalProcessor& config,
untyped_receiving_namespaces_(
config.metadata_options().receiving_namespaces().untyped().begin(),
config.metadata_options().receiving_namespaces().untyped().end()),
typed_receiving_namespaces_(config.metadata_options().receiving_namespaces().typed().begin(),
config.metadata_options().receiving_namespaces().typed().end()),
untyped_cluster_metadata_forwarding_namespaces_(
config.metadata_options().cluster_metadata_forwarding_namespaces().untyped().begin(),
config.metadata_options().cluster_metadata_forwarding_namespaces().untyped().end()),
Expand Down Expand Up @@ -635,6 +647,7 @@ FilterConfigPerRoute::FilterConfigPerRoute(
untyped_forwarding_namespaces_(initUntypedForwardingNamespaces(config)),
typed_forwarding_namespaces_(initTypedForwardingNamespaces(config)),
untyped_receiving_namespaces_(initUntypedReceivingNamespaces(config)),
typed_receiving_namespaces_(initTypedReceivingNamespaces(config)),
untyped_cluster_metadata_forwarding_namespaces_(
initUntypedClusterMetadataForwardingNamespaces(config)),
typed_cluster_metadata_forwarding_namespaces_(
Expand All @@ -661,6 +674,9 @@ FilterConfigPerRoute::FilterConfigPerRoute(const FilterConfigPerRoute& less_spec
untyped_receiving_namespaces_(more_specific.untypedReceivingMetadataNamespaces().has_value()
? more_specific.untypedReceivingMetadataNamespaces()
: less_specific.untypedReceivingMetadataNamespaces()),
typed_receiving_namespaces_(more_specific.typedReceivingMetadataNamespaces().has_value()
? more_specific.typedReceivingMetadataNamespaces()
: less_specific.typedReceivingMetadataNamespaces()),
untyped_cluster_metadata_forwarding_namespaces_(
more_specific.untypedClusterMetadataForwardingNamespaces().has_value()
? more_specific.untypedClusterMetadataForwardingNamespaces()
Expand Down Expand Up @@ -1621,33 +1637,57 @@ void Filter::addAttributes(ProcessorState& state, ProcessingRequest& req) {

void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state,
const ProcessingResponse& response) {
if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) {
if (response.has_dynamic_metadata()) {
ENVOY_STREAM_LOG(debug,
"processing response included dynamic metadata, but no receiving "
"namespaces are configured.",
*decoder_callbacks_);
// Handle untyped metadata
if (!state.untypedReceivingMetadataNamespaces().empty() && response.has_dynamic_metadata()) {
const auto& response_metadata = response.dynamic_metadata().fields();
const auto& receiving_namespaces = state.untypedReceivingMetadataNamespaces();
for (const auto& context_key : response_metadata) {
bool found_allowed_namespace = false;
if (auto metadata_it = std::find(receiving_namespaces.begin(), receiving_namespaces.end(),
context_key.first);
metadata_it != receiving_namespaces.end()) {
cb->streamInfo().setDynamicMetadata(context_key.first, context_key.second.struct_value());
found_allowed_namespace = true;
}
if (!found_allowed_namespace) {
ENVOY_STREAM_LOG(debug,
"processing response included dynamic metadata for namespace not "
"configured for receiving: {}",
*decoder_callbacks_, context_key.first);
}
}
return;
} else if (response.has_dynamic_metadata()) {
ENVOY_STREAM_LOG(debug,
"processing response included dynamic metadata, but no receiving "
"namespaces are configured.",
*decoder_callbacks_);
}

const auto& response_metadata = response.dynamic_metadata().fields();
const auto& receiving_namespaces = state.untypedReceivingMetadataNamespaces();
for (const auto& context_key : response_metadata) {
bool found_allowed_namespace = false;
if (auto metadata_it =
std::find(receiving_namespaces.begin(), receiving_namespaces.end(), context_key.first);
metadata_it != receiving_namespaces.end()) {
cb->streamInfo().setDynamicMetadata(context_key.first,
response_metadata.at(context_key.first).struct_value());
found_allowed_namespace = true;
}
if (!found_allowed_namespace) {
ENVOY_STREAM_LOG(debug,
"processing response included dynamic metadata for namespace not "
"configured for receiving: {}",
*decoder_callbacks_, context_key.first);
// Handle typed metadata
if (!state.typedReceivingMetadataNamespaces().empty() &&
!response.typed_dynamic_metadata().empty()) {
const auto& response_typed_metadata = response.typed_dynamic_metadata();
const auto& receiving_typed_namespaces = state.typedReceivingMetadataNamespaces();
for (const auto& context_key : response_typed_metadata) {
bool found_allowed_namespace = false;
if (auto metadata_it = std::find(receiving_typed_namespaces.begin(),
receiving_typed_namespaces.end(), context_key.first);
metadata_it != receiving_typed_namespaces.end()) {
cb->streamInfo().setDynamicTypedMetadata(context_key.first, context_key.second);
found_allowed_namespace = true;
}
if (!found_allowed_namespace) {
ENVOY_STREAM_LOG(debug,
"processing response included typed dynamic metadata for namespace not "
"configured for receiving: {}",
*decoder_callbacks_, context_key.first);
}
}
} else if (!response.typed_dynamic_metadata().empty()) {
ENVOY_STREAM_LOG(debug,
"processing response included typed dynamic metadata, but no typed receiving "
"namespaces are configured.",
*decoder_callbacks_);
}
}

Expand Down Expand Up @@ -2195,6 +2235,15 @@ void Filter::mergePerRouteConfig() {
encoding_state_.setUntypedReceivingMetadataNamespaces(untyped_receiving_namespaces_);
}

if (merged_config->typedReceivingMetadataNamespaces().has_value()) {
typed_receiving_namespaces_ = merged_config->typedReceivingMetadataNamespaces().value();
ENVOY_STREAM_LOG(trace,
"Setting new typed receiving metadata namespaces from per-route configuration",
*decoder_callbacks_);
decoding_state_.setTypedReceivingMetadataNamespaces(typed_receiving_namespaces_);
encoding_state_.setTypedReceivingMetadataNamespaces(typed_receiving_namespaces_);
}

if (merged_config->untypedClusterMetadataForwardingNamespaces().has_value()) {
untyped_cluster_metadata_forwarding_namespaces_ =
merged_config->untypedClusterMetadataForwardingNamespaces().value();
Expand Down
12 changes: 12 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ class FilterConfig {
return untyped_receiving_namespaces_;
}

const std::vector<std::string>& typedReceivingMetadataNamespaces() const {
return typed_receiving_namespaces_;
}

const std::vector<std::string>& untypedClusterMetadataForwardingNamespaces() const {
return untyped_cluster_metadata_forwarding_namespaces_;
}
Expand Down Expand Up @@ -371,6 +375,7 @@ class FilterConfig {
const std::vector<std::string> untyped_forwarding_namespaces_;
const std::vector<std::string> typed_forwarding_namespaces_;
const std::vector<std::string> untyped_receiving_namespaces_;
const std::vector<std::string> typed_receiving_namespaces_;
const std::vector<std::string> untyped_cluster_metadata_forwarding_namespaces_;
const std::vector<std::string> typed_cluster_metadata_forwarding_namespaces_;
// Empty allowed_header_ means allow all.
Expand Down Expand Up @@ -451,6 +456,9 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig {
const std::optional<const std::vector<std::string>>& untypedReceivingMetadataNamespaces() const {
return untyped_receiving_namespaces_;
}
const std::optional<const std::vector<std::string>>& typedReceivingMetadataNamespaces() const {
return typed_receiving_namespaces_;
}
const std::optional<const std::vector<std::string>>&
untypedClusterMetadataForwardingNamespaces() const {
return untyped_cluster_metadata_forwarding_namespaces_;
Expand Down Expand Up @@ -482,6 +490,7 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig {
const std::optional<const std::vector<std::string>> untyped_forwarding_namespaces_;
const std::optional<const std::vector<std::string>> typed_forwarding_namespaces_;
const std::optional<const std::vector<std::string>> untyped_receiving_namespaces_;
const std::optional<const std::vector<std::string>> typed_receiving_namespaces_;
const std::optional<const std::vector<std::string>>
untyped_cluster_metadata_forwarding_namespaces_;
const std::optional<const std::vector<std::string>> typed_cluster_metadata_forwarding_namespaces_;
Expand Down Expand Up @@ -517,12 +526,14 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
*this, config->processingMode(), config->untypedForwardingMetadataNamespaces(),
config->typedForwardingMetadataNamespaces(),
config->untypedReceivingMetadataNamespaces(),
config->typedReceivingMetadataNamespaces(),
config->untypedClusterMetadataForwardingNamespaces(),
config->typedClusterMetadataForwardingNamespaces(), config->keepContentLength()),
encoding_state_(
*this, config->processingMode(), config->untypedForwardingMetadataNamespaces(),
config->typedForwardingMetadataNamespaces(),
config->untypedReceivingMetadataNamespaces(),
config->typedReceivingMetadataNamespaces(),
config->untypedClusterMetadataForwardingNamespaces(),
config->typedClusterMetadataForwardingNamespaces(), config->keepContentLength()),
processing_request_modifier_(config->createProcessingRequestModifier()),
Expand Down Expand Up @@ -689,6 +700,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
std::vector<std::string> untyped_forwarding_namespaces_;
std::vector<std::string> typed_forwarding_namespaces_;
std::vector<std::string> untyped_receiving_namespaces_;
std::vector<std::string> typed_receiving_namespaces_;
std::vector<std::string> untyped_cluster_metadata_forwarding_namespaces_;
std::vector<std::string> typed_cluster_metadata_forwarding_namespaces_;
Http::StreamFilterCallbacks* filter_callbacks_;
Expand Down
18 changes: 16 additions & 2 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
const std::vector<std::string>& untyped_forwarding_namespaces,
const std::vector<std::string>& typed_forwarding_namespaces,
const std::vector<std::string>& untyped_receiving_namespaces,
const std::vector<std::string>& typed_receiving_namespaces,
const std::vector<std::string>& untyped_cluster_metadata_forwarding_namespaces,
const std::vector<std::string>& typed_cluster_metadata_forwarding_namespaces,
bool allow_content_length_header)
Expand All @@ -97,6 +98,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
untyped_forwarding_namespaces_(&untyped_forwarding_namespaces),
typed_forwarding_namespaces_(&typed_forwarding_namespaces),
untyped_receiving_namespaces_(&untyped_receiving_namespaces),
typed_receiving_namespaces_(&typed_receiving_namespaces),
untyped_cluster_metadata_forwarding_namespaces_(
&untyped_cluster_metadata_forwarding_namespaces),
typed_cluster_metadata_forwarding_namespaces_(
Expand Down Expand Up @@ -153,6 +155,13 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
void setUntypedReceivingMetadataNamespaces(const std::vector<std::string>& ns) {
untyped_receiving_namespaces_ = &ns;
};

const std::vector<std::string>& typedReceivingMetadataNamespaces() const {
return *typed_receiving_namespaces_;
};
void setTypedReceivingMetadataNamespaces(const std::vector<std::string>& ns) {
typed_receiving_namespaces_ = &ns;
};
const std::vector<std::string>& untypedClusterMetadataForwardingNamespaces() const {
return *untyped_cluster_metadata_forwarding_namespaces_;
}
Expand Down Expand Up @@ -404,6 +413,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
const std::vector<std::string>* untyped_forwarding_namespaces_{};
const std::vector<std::string>* typed_forwarding_namespaces_{};
const std::vector<std::string>* untyped_receiving_namespaces_{};
const std::vector<std::string>* typed_receiving_namespaces_{};
const std::vector<std::string>* untyped_cluster_metadata_forwarding_namespaces_{};
const std::vector<std::string>* typed_cluster_metadata_forwarding_namespaces_{};

Expand Down Expand Up @@ -533,12 +543,14 @@ class DecodingProcessorState : public ProcessorState {
const std::vector<std::string>& untyped_forwarding_namespaces,
const std::vector<std::string>& typed_forwarding_namespaces,
const std::vector<std::string>& untyped_receiving_namespaces,
const std::vector<std::string>& typed_receiving_namespaces,
const std::vector<std::string>& untyped_cluster_metadata_forwarding_namespaces,
const std::vector<std::string>& typed_cluster_metadata_forwarding_namespaces,
bool allow_content_length_header)
: ProcessorState(filter, envoy::config::core::v3::TrafficDirection::INBOUND,
untyped_forwarding_namespaces, typed_forwarding_namespaces,
untyped_receiving_namespaces, untyped_cluster_metadata_forwarding_namespaces,
untyped_receiving_namespaces, typed_receiving_namespaces,
untyped_cluster_metadata_forwarding_namespaces,
typed_cluster_metadata_forwarding_namespaces, allow_content_length_header) {
setProcessingModeInternal(mode);
}
Expand Down Expand Up @@ -681,12 +693,14 @@ class EncodingProcessorState : public ProcessorState {
const std::vector<std::string>& untyped_forwarding_namespaces,
const std::vector<std::string>& typed_forwarding_namespaces,
const std::vector<std::string>& untyped_receiving_namespaces,
const std::vector<std::string>& typed_receiving_namespaces,
const std::vector<std::string>& untyped_cluster_metadata_forwarding_namespaces,
const std::vector<std::string>& typed_cluster_metadata_forwarding_namespaces,
bool allow_content_length_header)
: ProcessorState(filter, envoy::config::core::v3::TrafficDirection::OUTBOUND,
untyped_forwarding_namespaces, typed_forwarding_namespaces,
untyped_receiving_namespaces, untyped_cluster_metadata_forwarding_namespaces,
untyped_receiving_namespaces, typed_receiving_namespaces,
untyped_cluster_metadata_forwarding_namespaces,
typed_cluster_metadata_forwarding_namespaces, allow_content_length_header) {
setProcessingModeInternal(mode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,25 @@ void ExtProcIntegrationTest::testSendDyanmicMetadata() {
});
}

void ExtProcIntegrationTest::testSendTypedDyanmicMetadata() {
envoy::extensions::filters::http::set_metadata::v3::Metadata typed_md_to_stuff;
typed_md_to_stuff.set_metadata_namespace("typed_value from ext_proc");

Protobuf::Any typed_val;
typed_val.PackFrom(typed_md_to_stuff);

processGenericMessage(*grpc_upstreams_[0], true,
[typed_val](const ProcessingRequest&, ProcessingResponse& resp) {
// Spoof the response to contain receiving typed metadata.
HeadersResponse headers_resp;
(*resp.mutable_request_headers()) = headers_resp;
auto mut_typed_md = resp.mutable_typed_dynamic_metadata();
(*mut_typed_md)["receiving_ns_typed"] = typed_val;

return true;
});
}

void ExtProcIntegrationTest::testSidestreamPushbackDownstream(uint32_t body_size,
bool check_downstream_flow_control) {
config_helper_.setBufferLimits(1024, 1024);
Expand Down
Loading
Loading