Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ builtinAuthenticationProfiles:
type: string
required: false
description: |
This maintains backwards compatibility with existing fields.
This maintains backwards compatibility with existing fields.
It will be deprecated as of Dapr 1.17. Use 'accessKey' instead.
If both fields are set, then 'accessKey' value will be used.
AWS access key associated with an IAM account.
Expand All @@ -43,7 +43,7 @@ builtinAuthenticationProfiles:
required: false
sensitive: true
description: |
This maintains backwards compatibility with existing fields.
This maintains backwards compatibility with existing fields.
It will be deprecated as of Dapr 1.17. Use 'secretKey' instead.
If both fields are set, then 'secretKey' value will be used.
The secret key associated with the access key.
Expand All @@ -52,7 +52,7 @@ builtinAuthenticationProfiles:
type: string
sensitive: true
description: |
This maintains backwards compatibility with existing fields.
This maintains backwards compatibility with existing fields.
It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead.
If both fields are set, then 'sessionToken' value will be used.
AWS session token to use. A session token is only required if you are using temporary security credentials.
Expand All @@ -61,15 +61,15 @@ builtinAuthenticationProfiles:
type: string
required: false
description: |
This maintains backwards compatibility with existing fields.
This maintains backwards compatibility with existing fields.
It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead.
If both fields are set, then 'assumeRoleArn' value will be used.
IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
example: '"arn:aws:iam::123456789:role/mskRole"'
- name: awsStsSessionName
type: string
description: |
This maintains backwards compatibility with existing fields.
This maintains backwards compatibility with existing fields.
It will be deprecated as of Dapr 1.17. Use 'sessionName' instead.
If both fields are set, then 'sessionName' value will be used.
Represents the session name for assuming a role.
Expand Down Expand Up @@ -121,6 +121,50 @@ authenticationProfiles:
example: |
{"cluster":"kafka","poolid":"kafkapool"}
type: string
- name: oidcClientAuthMethod
type: string
required: false
default: '"client_secret"'
description: |
Client authentication method at the token endpoint.
The "client_secret" method (default) uses the client_id and client_secret to authenticate.
The "client_jwt" method uses a JWT client assertion to authenticate.
example: '"client_jwt"'
allowedValues:
- "client_secret"
- "client_jwt"
- name: oidcClientAssertionCert
type: string
required: false
description: |
Required if "oidcClientAuthMethod" is set to "client_jwt".
PEM-encoded X.509 certificate used to advertise the key in the x5c header when using client_jwt.
example: |
-----BEGIN CERTIFICATE-----\n...
- name: oidcClientAssertionKey
type: string
required: false
sensitive: true
description: |
Required if "oidcClientAuthMethod" is set to "client_jwt".
PEM-encoded private key used to sign JWT client assertions for client_jwt.
example: |
-----BEGIN PRIVATE KEY-----\n...
- name: oidcResource
type: string
required: false
description: |
Primarily used with "client_jwt". Optional OAuth2 resource (audience) parameter
to include in the token request when required by the identity provider.
example: '"api://kafka"'
- name: oidcAudience
type: string
required: false
description: |
Only used if "oidcClientAuthMethod" is set to "client_jwt".
Overrides the JWT client assertion audience (aud). If not set, the component uses the
issuer derived from the token endpoint URL when available; otherwise, it falls back to the token URL.
example: '"http://<idp-host>/realms/local"'
- title: "SASL Authentication"
description: |
Authenticate using SASL.
Expand Down Expand Up @@ -348,7 +392,7 @@ metadata:
type: bool
required: false
description: |
Enables URL escaping of the message header values.
Enables URL escaping of the message header values.
It allows sending headers with special characters that are usually not allowed in HTTP headers.
example: "true"
default: "false"
Expand Down
75 changes: 43 additions & 32 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,41 @@ const (
)

type KafkaMetadata struct {
Brokers string `mapstructure:"brokers"`
internalBrokers []string `mapstructure:"-"`
ConsumerGroup string `mapstructure:"consumerGroup"`
ClientID string `mapstructure:"clientId"`
AuthType string `mapstructure:"authType"`
SaslUsername string `mapstructure:"saslUsername"`
SaslPassword string `mapstructure:"saslPassword"`
SaslMechanism string `mapstructure:"saslMechanism"`
InitialOffset string `mapstructure:"initialOffset"`
internalInitialOffset int64 `mapstructure:"-"`
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
OidcClientID string `mapstructure:"oidcClientID"`
OidcClientSecret string `mapstructure:"oidcClientSecret"`
OidcScopes string `mapstructure:"oidcScopes"`
OidcExtensions string `mapstructure:"oidcExtensions"`
internalOidcScopes []string `mapstructure:"-"`
TLSDisable bool `mapstructure:"disableTls"`
TLSSkipVerify bool `mapstructure:"skipVerify"`
TLSCaCert string `mapstructure:"caCert"`
TLSClientCert string `mapstructure:"clientCert"`
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
Version string `mapstructure:"version"`
EscapeHeaders bool `mapstructure:"escapeHeaders"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`
Brokers string `mapstructure:"brokers"`
internalBrokers []string `mapstructure:"-"`
ConsumerGroup string `mapstructure:"consumerGroup"`
ClientID string `mapstructure:"clientId"`
AuthType string `mapstructure:"authType"`
SaslUsername string `mapstructure:"saslUsername"`
SaslPassword string `mapstructure:"saslPassword"`
SaslMechanism string `mapstructure:"saslMechanism"`
InitialOffset string `mapstructure:"initialOffset"`
internalInitialOffset int64 `mapstructure:"-"`
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
OidcClientID string `mapstructure:"oidcClientID"`
OidcClientSecret string `mapstructure:"oidcClientSecret"`
OidcScopes string `mapstructure:"oidcScopes"`
OidcExtensions string `mapstructure:"oidcExtensions"`
OidcClientAuthMethod string `mapstructure:"oidcClientAuthMethod"`
OidcClientAssertionCert string `mapstructure:"oidcClientAssertionCert"`
OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"`
OidcResource string `mapstructure:"oidcResource"`
OidcAudience string `mapstructure:"oidcAudience"`
internalOidcScopes []string `mapstructure:"-"`
TLSDisable bool `mapstructure:"disableTls"`
TLSSkipVerify bool `mapstructure:"skipVerify"`
TLSCaCert string `mapstructure:"caCert"`
TLSClientCert string `mapstructure:"clientCert"`
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
Version string `mapstructure:"version"`
EscapeHeaders bool `mapstructure:"escapeHeaders"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`

// configs for kafka client
ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"`
Expand Down Expand Up @@ -235,8 +240,14 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
if m.OidcClientID == "" {
return nil, errors.New("kafka error: missing OIDC Client ID for authType 'oidc'")
}
if m.OidcClientSecret == "" {
return nil, errors.New("kafka error: missing OIDC Client Secret for authType 'oidc'")
if m.OidcClientAuthMethod == "client_secret" && m.OidcClientSecret == "" {
return nil, errors.New("kafka error: missing OIDC Client Secret for authType 'oidc' (client_secret)")
}
if m.OidcClientAuthMethod == "client_jwt" && m.OidcClientAssertionCert == "" {
return nil, errors.New("kafka error: missing OIDC Client Assertion Cert for authType 'oidc' (client_jwt)")
}
if m.OidcClientAuthMethod == "client_jwt" && m.OidcClientAssertionKey == "" {
return nil, errors.New("kafka error: missing OIDC Client Assertion Key for authType 'oidc' (client_jwt)")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we use pointers to signal presence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used empty strings for consistence with the rest of the metadata fields. I think it'd be better to change it all in a different PR, wdyt?

if m.OidcScopes != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add this field to the auth profile in the metadata.yaml as I don't see it with the openid string default

m.internalOidcScopes = strings.Split(m.OidcScopes, ",")
Expand Down
38 changes: 35 additions & 3 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ func TestMissingSaslValuesOnUpgrade(t *testing.T) {
require.Equal(t, fmt.Sprintf("kafka error: missing SASL Username for authType '%s'", passwordAuthType), err.Error())
}

func TestMissingOidcValues(t *testing.T) {
func TestMissingOidcClientSecretValues(t *testing.T) {
k := getKafka()
m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcAuthType}
m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcAuthType, "oidcClientAuthMethod": "client_secret"}
meta, err := k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
Expand All @@ -234,7 +234,7 @@ func TestMissingOidcValues(t *testing.T) {
meta, err = k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, fmt.Sprintf("kafka error: missing OIDC Client Secret for authType '%s'", oidcAuthType), err.Error())
require.Equal(t, fmt.Sprintf("kafka error: missing OIDC Client Secret for authType '%s' (client_secret)", oidcAuthType), err.Error())

// Check if missing scopes causes the default 'openid' to be used.
m["oidcClientSecret"] = "sassapass"
Expand All @@ -243,6 +243,38 @@ func TestMissingOidcValues(t *testing.T) {
require.Contains(t, meta.internalOidcScopes, "openid")
}

func TestMissingOidcClientJwtValues(t *testing.T) {
k := getKafka()
m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcAuthType, "oidcClientAuthMethod": "client_jwt"}
meta, err := k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, fmt.Sprintf("kafka error: missing OIDC Token Endpoint for authType '%s'", oidcAuthType), err.Error())

m["oidcTokenEndpoint"] = "https://sassa.fra/"
meta, err = k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, fmt.Sprintf("kafka error: missing OIDC Client ID for authType '%s'", oidcAuthType), err.Error())

m["oidcClientID"] = "sassafras"
meta, err = k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, fmt.Sprintf("kafka error: missing OIDC Client Assertion Cert for authType '%s' (client_jwt)", oidcAuthType), err.Error())

m["oidcClientAssertionCert"] = "sassapass"
meta, err = k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, fmt.Sprintf("kafka error: missing OIDC Client Assertion Key for authType '%s' (client_jwt)", oidcAuthType), err.Error())

m["oidcClientAssertionKey"] = "sassapass"
meta, err = k.getKafkaMetadata(m)
require.NoError(t, err)
require.Contains(t, meta.internalOidcScopes, "openid")
}

func TestPresentSaslValues(t *testing.T) {
k := getKafka()
m := map[string]string{
Expand Down
Loading
Loading