diff --git a/bindings/kafka/metadata.yaml b/bindings/kafka/metadata.yaml index 7415e1a146..c04ab9a3e4 100644 --- a/bindings/kafka/metadata.yaml +++ b/bindings/kafka/metadata.yaml @@ -33,7 +33,7 @@ builtinAuthenticationProfiles: type: string required: false description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. If both fields are set, then 'accessKey' value will be used. AWS access key associated with an IAM account. @@ -43,7 +43,7 @@ builtinAuthenticationProfiles: required: false sensitive: true description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. If both fields are set, then 'secretKey' value will be used. The secret key associated with the access key. @@ -52,7 +52,7 @@ builtinAuthenticationProfiles: type: string sensitive: true description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. If both fields are set, then 'sessionToken' value will be used. AWS session token to use. A session token is only required if you are using temporary security credentials. @@ -61,7 +61,7 @@ builtinAuthenticationProfiles: type: string required: false description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. @@ -69,7 +69,7 @@ builtinAuthenticationProfiles: - name: awsStsSessionName type: string description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role. @@ -78,7 +78,7 @@ builtinAuthenticationProfiles: authenticationProfiles: - title: "OIDC Authentication" description: | - Authenticate using OpenID Connect. + Authenticate using OpenID Connect providing a client secret. metadata: - name: authType type: string @@ -121,6 +121,72 @@ authenticationProfiles: example: | {"cluster":"kafka","poolid":"kafkapool"} type: string + - title: "OIDC Private Key JWT Authentication" + description: | + Authenticate using OpenID Connect providing a client certificate and private key. + metadata: + - name: authType + type: string + required: true + description: | + Authentication type. + This must be set to "oidc_private_key_jwt" for this authentication profile. + example: '"oidc_private_key_jwt"' + allowedValues: + - "oidc_private_key_jwt" + - name: oidcTokenEndpoint + type: string + required: true + description: | + URL of the OAuth2 identity provider access token endpoint. + example: '"https://identity.example.com/v1/token"' + - name: oidcClientID + description: | + The OAuth2 client ID that has been provisioned in the identity provider. + example: '"my-client-id"' + type: string + required: true + - name: oidcClientAssertionCert + type: string + required: true + description: | + PEM-encoded X.509 certificate used to advertise the client certificate in the x5c header. + example: | + -----BEGIN CERTIFICATE-----\n... + - name: oidcClientAssertionKey + type: string + required: true + sensitive: true + description: | + PEM-encoded private key used to sign the client certificate. + example: | + -----BEGIN PRIVATE KEY-----\n... + - name: oidcResource + type: string + required: false + description: | + Optional OAuth2 resource (audience) parameter to include in the token request when required by the identity provider. + example: '"api://kafka"' + - name: oidcAudience + type: string + required: false + description: | + Overrides the JWT client assertion audience (aud). If not set, the component uses the + issuer derived from the token endpoint URL when available; otherwise, it falls back to the token URL. + example: '"http:///realms/local"' + - name: oidcScopes + type: string + description: | + Comma-delimited list of OAuth2/OIDC scopes to request with the access token. + Although not required, this field is recommended. + example: '"openid,kafka-prod"' + default: '"openid"' + - name: oidcExtensions + description: | + String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token. + example: | + {"cluster":"kafka","poolid":"kafkapool"} + type: string - title: "SASL Authentication" description: | Authenticate using SASL. @@ -348,7 +414,7 @@ metadata: type: bool required: false description: | - Enables URL escaping of the message header values. + Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. example: "true" default: "false" diff --git a/common/component/kafka/auth.go b/common/component/kafka/auth.go index ea8cc43fac..d84c577c3d 100644 --- a/common/component/kafka/auth.go +++ b/common/component/kafka/auth.go @@ -88,3 +88,20 @@ func updateOidcAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error { return nil } + +func updateOidcPrivateKeyJWTAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error { + tokenProvider := metadata.getOAuthTokenSourcePrivateKeyJWT() + + if metadata.TLSCaCert != "" { + err := tokenProvider.addCa(metadata.TLSCaCert) + if err != nil { + return fmt.Errorf("kafka: error setting oauth client trusted CA: %w", err) + } + } + + config.Net.SASL.Enable = true + config.Net.SASL.Mechanism = sarama.SASLTypeOAuth + config.Net.SASL.TokenProvider = tokenProvider + + return nil +} diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index e23659d165..713cd1222d 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -181,6 +181,12 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { if err != nil { return err } + case oidcPrivateKeyJWTAuthType: + k.logger.Info("Configuring SASL OAuth2/OIDC authentication with private key JWT") + err = updateOidcPrivateKeyJWTAuthInfo(config, meta) + if err != nil { + return err + } case passwordAuthType: k.logger.Info("Configuring SASL Password authentication") k.saslUsername = meta.SaslUsername diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index 273f32e547..2e8def21f2 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -42,6 +42,7 @@ const ( authType = "authType" passwordAuthType = "password" oidcAuthType = "oidc" + oidcPrivateKeyJWTAuthType = "oidc_private_key_jwt" mtlsAuthType = "mtls" awsIAMAuthType = "awsiam" noAuthType = "none" @@ -65,36 +66,40 @@ const ( ) type KafkaMetadata struct { - Brokers string `mapstructure:"brokers"` - internalBrokers []string `mapstructure:"-"` - ConsumerGroup string `mapstructure:"consumerGroup"` - ClientID string `mapstructure:"clientId"` - AuthType string `mapstructure:"authType"` - SaslUsername string `mapstructure:"saslUsername"` - SaslPassword string `mapstructure:"saslPassword"` - SaslMechanism string `mapstructure:"saslMechanism"` - InitialOffset string `mapstructure:"initialOffset"` - internalInitialOffset int64 `mapstructure:"-"` - MaxMessageBytes int `mapstructure:"maxMessageBytes"` - OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"` - OidcClientID string `mapstructure:"oidcClientID"` - OidcClientSecret string `mapstructure:"oidcClientSecret"` - OidcScopes string `mapstructure:"oidcScopes"` - OidcExtensions string `mapstructure:"oidcExtensions"` - internalOidcScopes []string `mapstructure:"-"` - TLSDisable bool `mapstructure:"disableTls"` - TLSSkipVerify bool `mapstructure:"skipVerify"` - TLSCaCert string `mapstructure:"caCert"` - TLSClientCert string `mapstructure:"clientCert"` - TLSClientKey string `mapstructure:"clientKey"` - ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"` - ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"` - HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"` - SessionTimeout time.Duration `mapstructure:"sessionTimeout"` - Version string `mapstructure:"version"` - EscapeHeaders bool `mapstructure:"escapeHeaders"` - internalVersion sarama.KafkaVersion `mapstructure:"-"` - internalOidcExtensions map[string]string `mapstructure:"-"` + Brokers string `mapstructure:"brokers"` + internalBrokers []string `mapstructure:"-"` + ConsumerGroup string `mapstructure:"consumerGroup"` + ClientID string `mapstructure:"clientId"` + AuthType string `mapstructure:"authType"` + SaslUsername string `mapstructure:"saslUsername"` + SaslPassword string `mapstructure:"saslPassword"` + SaslMechanism string `mapstructure:"saslMechanism"` + InitialOffset string `mapstructure:"initialOffset"` + internalInitialOffset int64 `mapstructure:"-"` + MaxMessageBytes int `mapstructure:"maxMessageBytes"` + OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"` + OidcClientID string `mapstructure:"oidcClientID"` + OidcClientSecret string `mapstructure:"oidcClientSecret"` + OidcScopes string `mapstructure:"oidcScopes"` + OidcExtensions string `mapstructure:"oidcExtensions"` + OidcClientAssertionCert string `mapstructure:"oidcClientAssertionCert"` + OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"` + OidcResource string `mapstructure:"oidcResource"` + OidcAudience string `mapstructure:"oidcAudience"` + internalOidcScopes []string `mapstructure:"-"` + TLSDisable bool `mapstructure:"disableTls"` + TLSSkipVerify bool `mapstructure:"skipVerify"` + TLSCaCert string `mapstructure:"caCert"` + TLSClientCert string `mapstructure:"clientCert"` + TLSClientKey string `mapstructure:"clientKey"` + ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"` + ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"` + HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"` + SessionTimeout time.Duration `mapstructure:"sessionTimeout"` + Version string `mapstructure:"version"` + EscapeHeaders bool `mapstructure:"escapeHeaders"` + internalVersion sarama.KafkaVersion `mapstructure:"-"` + internalOidcExtensions map[string]string `mapstructure:"-"` // configs for kafka client ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"` @@ -251,6 +256,32 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) } } k.logger.Debug("Configuring SASL token authentication via OIDC.") + case oidcPrivateKeyJWTAuthType: + if m.OidcTokenEndpoint == "" { + return nil, errors.New("kafka error: missing OIDC Token Endpoint for authType 'oidc_private_key_jwt'") + } + if m.OidcClientID == "" { + return nil, errors.New("kafka error: missing OIDC Client ID for authType 'oidc_private_key_jwt'") + } + if m.OidcClientAssertionCert == "" { + return nil, errors.New("kafka error: missing OIDC Client Assertion Cert for authType 'oidc_private_key_jwt'") + } + if m.OidcClientAssertionKey == "" { + return nil, errors.New("kafka error: missing OIDC Client Assertion Key for authType 'oidc_private_key_jwt'") + } + if m.OidcScopes != "" { + m.internalOidcScopes = strings.Split(m.OidcScopes, ",") + } else { + k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This is a security risk for token reuse.") + m.internalOidcScopes = []string{"openid"} + } + if m.OidcExtensions != "" { + err = json.Unmarshal([]byte(m.OidcExtensions), &m.internalOidcExtensions) + if err != nil || len(m.internalOidcExtensions) < 1 { + return nil, errors.New("kafka error: improper OIDC Extensions format for authType 'oidc_private_key_jwt'") + } + } + k.logger.Debug("Configuring SASL token authentication via OIDC with private_key_jwt.") case mtlsAuthType: if m.TLSClientCert != "" { if !isValidPEM(m.TLSClientCert) { diff --git a/common/component/kafka/metadata_test.go b/common/component/kafka/metadata_test.go index bdba431a46..9dfd6edaf9 100644 --- a/common/component/kafka/metadata_test.go +++ b/common/component/kafka/metadata_test.go @@ -216,7 +216,7 @@ func TestMissingSaslValuesOnUpgrade(t *testing.T) { require.Equal(t, fmt.Sprintf("kafka error: missing SASL Username for authType '%s'", passwordAuthType), err.Error()) } -func TestMissingOidcValues(t *testing.T) { +func TestMissingOidcClientSecretValues(t *testing.T) { k := getKafka() m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcAuthType} meta, err := k.getKafkaMetadata(m) @@ -243,6 +243,38 @@ func TestMissingOidcValues(t *testing.T) { require.Contains(t, meta.internalOidcScopes, "openid") } +func TestMissingOidcPrivateKeyJwtValues(t *testing.T) { + k := getKafka() + m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcPrivateKeyJWTAuthType} + meta, err := k.getKafkaMetadata(m) + require.Error(t, err) + require.Nil(t, meta) + require.Equal(t, "kafka error: missing OIDC Token Endpoint for authType 'oidc_private_key_jwt'", err.Error()) + + m["oidcTokenEndpoint"] = "https://sassa.fra/" + meta, err = k.getKafkaMetadata(m) + require.Error(t, err) + require.Nil(t, meta) + require.Equal(t, "kafka error: missing OIDC Client ID for authType 'oidc_private_key_jwt'", err.Error()) + + m["oidcClientID"] = "sassafras" + meta, err = k.getKafkaMetadata(m) + require.Error(t, err) + require.Nil(t, meta) + require.Equal(t, "kafka error: missing OIDC Client Assertion Cert for authType 'oidc_private_key_jwt'", err.Error()) + + m["oidcClientAssertionCert"] = "sassapass" + meta, err = k.getKafkaMetadata(m) + require.Error(t, err) + require.Nil(t, meta) + require.Equal(t, "kafka error: missing OIDC Client Assertion Key for authType 'oidc_private_key_jwt'", err.Error()) + + m["oidcClientAssertionKey"] = "sassapass" + meta, err = k.getKafkaMetadata(m) + require.NoError(t, err) + require.Contains(t, meta.internalOidcScopes, "openid") +} + func TestPresentSaslValues(t *testing.T) { k := getKafka() m := map[string]string{ diff --git a/common/component/kafka/sasl_oauthbearer.go b/common/component/kafka/sasl_oauthbearer.go index 125956617c..eb9a684b63 100644 --- a/common/component/kafka/sasl_oauthbearer.go +++ b/common/component/kafka/sasl_oauthbearer.go @@ -17,12 +17,13 @@ import ( ctx "context" "crypto/tls" "crypto/x509" - "encoding/pem" "errors" "fmt" "net/http" "time" + "github.com/dapr/kit/crypto/pem" + "github.com/IBM/sarama" "golang.org/x/oauth2" ccred "golang.org/x/oauth2/clientcredentials" @@ -51,26 +52,21 @@ func (m KafkaMetadata) getOAuthTokenSource() *OAuthTokenSource { } } -var tokenRequestTimeout, _ = time.ParseDuration("30s") - func (ts *OAuthTokenSource) addCa(caPem string) error { pemBytes := []byte(caPem) - block, _ := pem.Decode(pemBytes) - - if block == nil || block.Type != "CERTIFICATE" { - return errors.New("PEM data not valid or not of a valid type (CERTIFICATE)") - } - - caCert, err := x509.ParseCertificate(block.Bytes) + caCerts, err := pem.DecodePEMCertificates(pemBytes) if err != nil { return fmt.Errorf("error parsing PEM certificate: %w", err) } + if len(caCerts) > 1 { + return fmt.Errorf("expected 1 certificate, got %d", len(caCerts)) + } if ts.trustedCas == nil { ts.trustedCas = make([]*x509.Certificate, 0) } - ts.trustedCas = append(ts.trustedCas, caCert) + ts.trustedCas = append(ts.trustedCas, caCerts[0]) return nil } @@ -113,9 +109,15 @@ func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error) { return nil, errors.New("cannot generate token, OAuthTokenSource not fully configured") } - oidcCfg := ccred.Config{ClientID: ts.ClientID, ClientSecret: ts.ClientSecret, Scopes: ts.Scopes, TokenURL: ts.TokenEndpoint.TokenURL, AuthStyle: ts.TokenEndpoint.AuthStyle} + oidcCfg := ccred.Config{ + ClientID: ts.ClientID, + ClientSecret: ts.ClientSecret, + Scopes: ts.Scopes, + TokenURL: ts.TokenEndpoint.TokenURL, + AuthStyle: ts.TokenEndpoint.AuthStyle, + } - timeoutCtx, cancel := ctx.WithTimeout(ctx.TODO(), tokenRequestTimeout) + timeoutCtx, cancel := ctx.WithTimeout(ctx.TODO(), 30*time.Second) defer cancel() ts.configureClient() diff --git a/common/component/kafka/sasl_oauthbearer_private_key_jwt.go b/common/component/kafka/sasl_oauthbearer_private_key_jwt.go new file mode 100644 index 0000000000..6f5c6b092c --- /dev/null +++ b/common/component/kafka/sasl_oauthbearer_private_key_jwt.go @@ -0,0 +1,218 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + ctx "context" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/dapr/kit/crypto/pem" + + "github.com/IBM/sarama" + "github.com/google/uuid" + "github.com/lestrrat-go/jwx/v2/jwa" + "github.com/lestrrat-go/jwx/v2/jwt" + "golang.org/x/oauth2" +) + +type OAuthTokenSourcePrivateKeyJWT struct { + CachedToken oauth2.Token + Extensions map[string]string + TokenEndpoint oauth2.Endpoint + ClientID string + ClientSecret string + Scopes []string + httpClient *http.Client + trustedCas []*x509.Certificate + skipCaVerify bool + ClientAuthMethod string + ClientAssertionCert string + ClientAssertionKey string + Resource string + Audience string +} + +type tokenResponse struct { + AccessToken string `json:"access_token"` + ExpiresIn int64 `json:"expires_in"` +} + +func (m KafkaMetadata) getOAuthTokenSourcePrivateKeyJWT() *OAuthTokenSourcePrivateKeyJWT { + return &OAuthTokenSourcePrivateKeyJWT{ + TokenEndpoint: oauth2.Endpoint{TokenURL: m.OidcTokenEndpoint}, + ClientID: m.OidcClientID, + ClientSecret: m.OidcClientSecret, + Scopes: m.internalOidcScopes, + Extensions: m.internalOidcExtensions, + skipCaVerify: m.TLSSkipVerify, + ClientAuthMethod: m.AuthType, + ClientAssertionCert: m.OidcClientAssertionCert, + ClientAssertionKey: m.OidcClientAssertionKey, + Resource: m.OidcResource, + Audience: m.OidcAudience, + } +} + +func (ts *OAuthTokenSourcePrivateKeyJWT) addCa(caPem string) error { + pemBytes := []byte(caPem) + + caCerts, err := pem.DecodePEMCertificates(pemBytes) + if err != nil { + return fmt.Errorf("error parsing PEM certificate: %w", err) + } + if len(caCerts) > 1 { + return fmt.Errorf("expected 1 certificate, got %d", len(caCerts)) + } + + if ts.trustedCas == nil { + ts.trustedCas = make([]*x509.Certificate, 0) + } + ts.trustedCas = append(ts.trustedCas, caCerts[0]) + + return nil +} + +func (ts *OAuthTokenSourcePrivateKeyJWT) configureClient() { + if ts.httpClient != nil { + return + } + + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: ts.skipCaVerify, //nolint:gosec + } + + if ts.trustedCas != nil { + caPool, err := x509.SystemCertPool() + if err != nil { + caPool = x509.NewCertPool() + } + + for _, c := range ts.trustedCas { + caPool.AddCert(c) + } + tlsConfig.RootCAs = caPool + } + + ts.httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } +} + +// At the time of writing this, the oauth2 package does not support the client assertion authentication method. +// Ref: https://github.com/golang/oauth2/issues/744 +func (ts *OAuthTokenSourcePrivateKeyJWT) Token() (*sarama.AccessToken, error) { + if ts.CachedToken.Valid() { + return ts.asSaramaToken(), nil + } + + if ts.TokenEndpoint.TokenURL == "" || ts.ClientID == "" { + return nil, errors.New("cannot generate token, OAuthTokenSourcePrivateKeyJWT not fully configured") + } + + if ts.ClientAssertionCert == "" || ts.ClientAssertionKey == "" { + return nil, errors.New("client_jwt requires client assertion cert and key") + } + + pk, err := pem.DecodePEMPrivateKey([]byte(ts.ClientAssertionKey)) + if err != nil { + return nil, fmt.Errorf("unable to parse private key: %w", err) + } + rsaKey, ok := pk.(*rsa.PrivateKey) + if !ok { + return nil, errors.New("client_jwt requires RSA private key") + } + + now := time.Now() + aud := ts.TokenEndpoint.TokenURL + + audClaim := aud + if ts.Audience != "" { + audClaim = ts.Audience + } + + token, err := jwt.NewBuilder(). + Issuer(ts.ClientID). + Subject(ts.ClientID). + Audience([]string{audClaim}). + IssuedAt(now). + Expiration(now.Add(1 * time.Minute)). + JwtID(uuid.New().String()). + NotBefore(now). + Build() + if err != nil { + return nil, fmt.Errorf("failed to build token: %w", err) + } + + assertion, err := jwt.Sign(token, jwt.WithKey(jwa.RS256, rsaKey)) + if err != nil { + return nil, fmt.Errorf("error signing client assertion: %w", err) + } + + urlValues := &url.Values{} + urlValues.Set("grant_type", "client_credentials") + urlValues.Set("client_id", ts.ClientID) + urlValues.Set("client_assertion_type", "urn:ietf:params:oauth:client-assertion-type:jwt-bearer") + urlValues.Set("client_assertion", string(assertion)) + if ts.Audience != "" { + urlValues.Set("audience", ts.Audience) + } + if ts.Resource != "" { + urlValues.Set("resource", ts.Resource) + } + if len(ts.Scopes) > 0 { + urlValues.Set("scope", strings.Join(ts.Scopes, " ")) + } + + timeoutCtx, cancel := ctx.WithTimeout(ctx.TODO(), 30*time.Second) + defer cancel() + ts.configureClient() + req, err := http.NewRequestWithContext(timeoutCtx, http.MethodPost, ts.TokenEndpoint.TokenURL, strings.NewReader(urlValues.Encode())) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err := ts.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("token endpoint returned %d", resp.StatusCode) + } + var tr tokenResponse + if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil { + return nil, err + } + if tr.AccessToken == "" { + return nil, errors.New("no access_token in response") + } + ts.CachedToken = oauth2.Token{AccessToken: tr.AccessToken, Expiry: time.Now().Add(time.Duration(tr.ExpiresIn) * time.Second)} + return ts.asSaramaToken(), nil +} + +func (ts *OAuthTokenSourcePrivateKeyJWT) asSaramaToken() *sarama.AccessToken { + return &(sarama.AccessToken{Token: ts.CachedToken.AccessToken, Extensions: ts.Extensions}) +} diff --git a/pubsub/kafka/metadata.yaml b/pubsub/kafka/metadata.yaml index 17999f4c03..2c08d3e56c 100644 --- a/pubsub/kafka/metadata.yaml +++ b/pubsub/kafka/metadata.yaml @@ -27,7 +27,7 @@ builtinAuthenticationProfiles: type: string required: false description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. If both fields are set, then 'accessKey' value will be used. AWS access key associated with an IAM account. @@ -37,7 +37,7 @@ builtinAuthenticationProfiles: required: false sensitive: true description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. If both fields are set, then 'secretKey' value will be used. The secret key associated with the access key. @@ -46,7 +46,7 @@ builtinAuthenticationProfiles: type: string sensitive: true description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. If both fields are set, then 'sessionToken' value will be used. AWS session token to use. A session token is only required if you are using temporary security credentials. @@ -55,7 +55,7 @@ builtinAuthenticationProfiles: type: string required: false description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. @@ -63,7 +63,7 @@ builtinAuthenticationProfiles: - name: awsStsSessionName type: string description: | - This maintains backwards compatibility with existing fields. + This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role. @@ -72,7 +72,7 @@ builtinAuthenticationProfiles: authenticationProfiles: - title: "OIDC Authentication" description: | - Authenticate using OpenID Connect. + Authenticate using OpenID Connect providing a client secret. metadata: - name: authType type: string @@ -115,6 +115,72 @@ authenticationProfiles: example: | {"cluster":"kafka","poolid":"kafkapool"} type: string + - title: "OIDC Private Key JWT Authentication" + description: | + Authenticate using OpenID Connect providing a client certificate and private key. + metadata: + - name: authType + type: string + required: true + description: | + Authentication type. + This must be set to "oidc_private_key_jwt" for this authentication profile. + example: '"oidc_private_key_jwt"' + allowedValues: + - "oidc_private_key_jwt" + - name: oidcTokenEndpoint + type: string + required: true + description: | + URL of the OAuth2 identity provider access token endpoint. + example: '"https://identity.example.com/v1/token"' + - name: oidcClientID + description: | + The OAuth2 client ID that has been provisioned in the identity provider. + example: '"my-client-id"' + type: string + required: true + - name: oidcClientAssertionCert + type: string + required: true + description: | + PEM-encoded X.509 certificate used to advertise the client certificate in the x5c header. + example: | + -----BEGIN CERTIFICATE-----\n... + - name: oidcClientAssertionKey + type: string + required: true + sensitive: true + description: | + PEM-encoded private key used to sign the client certificate. + example: | + -----BEGIN PRIVATE KEY-----\n... + - name: oidcResource + type: string + required: false + description: | + Optional OAuth2 resource (audience) parameter to include in the token request when required by the identity provider. + example: '"api://kafka"' + - name: oidcAudience + type: string + required: false + description: | + Overrides the JWT client assertion audience (aud). If not set, the component uses the + issuer derived from the token endpoint URL when available; otherwise, it falls back to the token URL. + example: '"http:///realms/local"' + - name: oidcScopes + type: string + description: | + Comma-delimited list of OAuth2/OIDC scopes to request with the access token. + Although not required, this field is recommended. + example: '"openid,kafka-prod"' + default: '"openid"' + - name: oidcExtensions + description: | + String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token. + example: | + {"cluster":"kafka","poolid":"kafkapool"} + type: string - title: "SASL Authentication" description: | Authenticate using SASL. @@ -339,7 +405,7 @@ metadata: type: bool required: false description: | - Enables URL escaping of the message header values. + Enables URL escaping of the message header values. It allows sending headers with special characters that are usually not allowed in HTTP headers. example: "true" default: "false" diff --git a/tests/certification/flow/dockercompose/dockercompose.go b/tests/certification/flow/dockercompose/dockercompose.go index 83bb243b4e..3069ca11a3 100644 --- a/tests/certification/flow/dockercompose/dockercompose.go +++ b/tests/certification/flow/dockercompose/dockercompose.go @@ -54,6 +54,7 @@ func (c Compose) Up(ctx flow.Context) error { "-p", c.project, "-f", c.filename, "up", "-d", + "--wait", "--remove-orphans").CombinedOutput() ctx.Log(string(out)) diff --git a/tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml b/tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml new file mode 100644 index 0000000000..0d662c3a14 --- /dev/null +++ b/tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml @@ -0,0 +1,33 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.kafka + version: v1 + metadata: + - name: brokers + value: localhost:9092 + - name: authType + value: "oidc_private_key_jwt" + - name: oidcTokenEndpoint + value: "http://localhost:8080/realms/local/protocol/openid-connect/token" + - name: oidcClientID + value: "dapr-kafka-client-jwt" + - name: oidcClientAssertionCert + secretKeyRef: + name: OIDC_CLIENT_ASSERTION_CERT + key: OIDC_CLIENT_ASSERTION_CERT + - name: oidcClientAssertionKey + secretKeyRef: + name: OIDC_CLIENT_ASSERTION_KEY + key: OIDC_CLIENT_ASSERTION_KEY + - name: oidcAudience + value: "http://keycloak:8080/realms/local/protocol/openid-connect/token" + - name: oidcScopes + value: "openid" + - name: disableTls + value: "true" + +auth: + secretStore: envvar-secret-store diff --git a/tests/certification/pubsub/kafka/components/auth_oidc_certs/localsecrets.yaml b/tests/certification/pubsub/kafka/components/auth_oidc_certs/localsecrets.yaml new file mode 100644 index 0000000000..38936ccf08 --- /dev/null +++ b/tests/certification/pubsub/kafka/components/auth_oidc_certs/localsecrets.yaml @@ -0,0 +1,8 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: envvar-secret-store +spec: + type: secretstores.local.env + version: v1 + metadata: diff --git a/tests/certification/pubsub/kafka/components/auth_oidc_secret_key/kafka.yaml b/tests/certification/pubsub/kafka/components/auth_oidc_secret_key/kafka.yaml new file mode 100644 index 0000000000..b1c93d0bb6 --- /dev/null +++ b/tests/certification/pubsub/kafka/components/auth_oidc_secret_key/kafka.yaml @@ -0,0 +1,22 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.kafka + version: v1 + metadata: + - name: brokers + value: localhost:9092 + - name: authType + value: "oidc" + - name: oidcTokenEndpoint + value: "http://localhost:8080/realms/local/protocol/openid-connect/token" + - name: oidcClientID + value: "dapr-kafka-client-secret" + - name: oidcClientSecret + value: "dapr-kafka-secret" + - name: oidcScopes + value: "openid" + - name: disableTls + value: "true" diff --git a/tests/certification/pubsub/kafka/data/realm-export.json b/tests/certification/pubsub/kafka/data/realm-export.json new file mode 100644 index 0000000000..5e310f0958 --- /dev/null +++ b/tests/certification/pubsub/kafka/data/realm-export.json @@ -0,0 +1,43 @@ + +{ + "realm": "local", + "enabled": true, + "clients": [ + { + "clientId": "dapr-kafka-client-jwt", + "secret": "dapr-kafka-secret", + "clientAuthenticatorType": "client-jwt", + "attributes": { + "token.endpoint.auth.signing.alg": "RS256", + "jwt.credential.certificate": "${OIDC_CLIENT_ASSERTION_CERT_ONELINE}" + }, + "enabled": true, + "protocol": "openid-connect", + "publicClient": false, + "serviceAccountsEnabled": true, + "directAccessGrantsEnabled": false, + "standardFlowEnabled": false + }, + { + "clientId": "dapr-kafka-client-secret", + "secret": "dapr-kafka-secret", + "clientAuthenticatorType": "client-secret", + "enabled": true, + "protocol": "openid-connect", + "publicClient": false, + "serviceAccountsEnabled": true, + "directAccessGrantsEnabled": false, + "standardFlowEnabled": false + }, + { + "clientId": "kafka-broker", + "secret": "kafka-broker-secret", + "enabled": true, + "protocol": "openid-connect", + "publicClient": false, + "serviceAccountsEnabled": true, + "directAccessGrantsEnabled": false, + "standardFlowEnabled": false + } + ] +} diff --git a/tests/certification/pubsub/kafka/docker-compose.auth.yml b/tests/certification/pubsub/kafka/docker-compose.auth.yml new file mode 100644 index 0000000000..fc34a53ee9 --- /dev/null +++ b/tests/certification/pubsub/kafka/docker-compose.auth.yml @@ -0,0 +1,62 @@ +version: "3.7" +services: + kafka_oidc_with_certificates: + image: confluentinc/cp-server:7.7.5 + hostname: kafka_oidc_with_certificates + container_name: kafka_oidc_with_certificates + depends_on: + keycloak: + condition: service_healthy + healthcheck: + test: kafka-topics --bootstrap-server kafka_oidc_with_certificates:29092 --list + start_period: 20s + interval: 2s + timeout: 2s + retries: 5 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 0 + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_CONTROLLER_QUORUM_VOTERS: '0@kafka_oidc_with_certificates:29093' + CLUSTER_ID: 'JikQ_wHyRRSqpLUFRjMqwA' + KAFKA_MIN_INSYNC_REPLICAS: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAIN9092:SASL_PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,PLAIN9092://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAIN9092://localhost:9092 + KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER + KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: http://keycloak:8080/realms/local/protocol/openid-connect/certs + KAFKA_SASL_OAUTHBEARER_EXPECTED_ISSUER: http://keycloak:8080/realms/local + KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: HTTPS + + # Listener configuration: PLAIN9092 + KAFKA_LISTENER_NAME_PLAIN9092_SASL_ENABLED_MECHANISMS: OAUTHBEARER + KAFKA_LISTENER_NAME_PLAIN9092_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler + KAFKA_LISTENER_NAME_PLAIN9092_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="thePrincipalName" oauth.client.id="kafka-broker" oauth.valid.issuer.uri="http://keycloak:8080/realms/local" oauth.jwks.endpoint.uri="http://keycloak:8080/realms/local/protocol/openid-connect/certs" oauth.config.id="PLAIN9092" oauth.client.secret="kafka-broker-secret"; + + keycloak: + image: quay.io/keycloak/keycloak:23.0.5 + hostname: keycloak + container_name: keycloak + command: ["start-dev", "--http-enabled=true", "--health-enabled=true", "--import-realm"] + healthcheck: + test: ["CMD-SHELL", "exec 3<>/dev/tcp/127.0.0.1/8080; echo -e 'GET /health/ready HTTP/1.1\r\nHost: localhost:8080\r\nConnection: close\r\n\r\n' >&3;cat <&3 | grep -q '\"status\": \"UP\"' && exit 0 || exit 1"] + start_period: 20s + interval: 2s + timeout: 2s + retries: 5 + environment: + - OIDC_CLIENT_ASSERTION_CERT_ONELINE + - KEYCLOAK_ADMIN=admin + - KEYCLOAK_ADMIN_PASSWORD=admin + - KC_HOSTNAME_URL=http://keycloak:8080 + ports: + - "8080:8080" + volumes: + - ./data/realm-export.json:/opt/keycloak/data/import/local-realm.json diff --git a/tests/certification/pubsub/kafka/kafka_test.go b/tests/certification/pubsub/kafka/kafka_test.go index da50d180a0..a0cb1c51bf 100644 --- a/tests/certification/pubsub/kafka/kafka_test.go +++ b/tests/certification/pubsub/kafka/kafka_test.go @@ -15,9 +15,17 @@ package kafka_test import ( "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" "encoding/json" + "encoding/pem" "fmt" + "math/big" + "os" "strconv" + "strings" "testing" "time" @@ -31,7 +39,9 @@ import ( // Pub/Sub. pubsub_kafka "github.com/dapr/components-contrib/pubsub/kafka" + secretstore_env "github.com/dapr/components-contrib/secretstores/local/env" pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub" + secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores" "github.com/dapr/dapr/pkg/config/protocol" // Dapr runtime and Go-SDK @@ -54,20 +64,26 @@ import ( ) const ( - sidecarName1 = "dapr-1" - sidecarName2 = "dapr-2" - sidecarName3 = "dapr-3" - sidecarNameAvro = "dapr-avro" - appID1 = "app-1" - appID2 = "app-2" - appID3 = "app-3" - appIDAvro = "app-avro" - clusterName = "kafkacertification" - dockerComposeYAML = "docker-compose.yml" - numMessages = 1000 - appPort = 8000 - portOffset = 2 - messageKey = "partitionKey" + sidecarName1 = "dapr-1" + sidecarName2 = "dapr-2" + sidecarName3 = "dapr-3" + sidecarNameOIDCCerts = "dapr-oidc-certs" + sidecarNameOIDCSecretKey = "dapr-oidc-secret-key" + sidecarNameAvro = "dapr-avro" + appID1 = "app-1" + appID2 = "app-2" + appID3 = "app-3" + appIDOIDCCerts = "app-oidc-certs" + appIDOIDCSecretKey = "app-oidc-secret-key" + appIDAvro = "app-avro" + clusterName = "kafkacertification" + clusterNameAuth = "kafkacertification-auth" + dockerComposeYAML = "docker-compose.yml" + dockerComposeYAMLAuth = "docker-compose.auth.yml" + numMessages = 1000 + appPort = 8000 + portOffset = 2 + messageKey = "partitionKey" pubsubName = "messagebus" topicName = "neworder" @@ -454,6 +470,98 @@ func TestKafka(t *testing.T) { Run() } +func TestKafkaAuth(t *testing.T) { + consumerGroupOIDCCerts := watcher.NewOrdered() + consumerGroupOIDCSecretKey := watcher.NewOrdered() + + application := func(appName string, watcher *watcher.Watcher) app.SetupFn { + return func(ctx flow.Context, s common.Service) error { + return multierr.Combine( + s.AddTopicEventHandler(&common.Subscription{ + PubsubName: pubsubName, + Topic: topicName, + Route: "/orders", + }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { + watcher.Observe(e.Data) + return false, nil + }), + ) + } + } + + sendTest := func(sidecarName string, watcher *watcher.Watcher) flow.Runnable { + return func(ctx flow.Context) error { + client := sidecar.GetClient(ctx, sidecarName) + + message := fmt.Sprintf("Hello! %s", uuid.New().String()) + + watcher.ExpectStrings(message) + + err := client.PublishEvent(ctx, pubsubName, topicName, message) + require.NoError(ctx, err, "error publishing message") + + watcher.Assert(ctx, time.Minute) + return nil + } + } + + // Generate a private key and certificate for the OIDC client + key, err := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, err) + template := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"Dapr"}, + }, + } + cert, err := x509.CreateCertificate(rand.Reader, template, template, &key.PublicKey, key) + require.NoError(t, err) + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}) + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) + os.Setenv("OIDC_CLIENT_ASSERTION_CERT", string(certPEM)) + os.Setenv("OIDC_CLIENT_ASSERTION_KEY", string(keyPEM)) + os.Setenv("OIDC_CLIENT_ASSERTION_CERT_ONELINE", strings.ReplaceAll(string(certPEM), "\n", "\\n")) + + flow.New(t, "kafka authentication"). + Step(dockercompose.Run(clusterNameAuth, dockerComposeYAMLAuth)). + Step("wait for broker sockets", + network.WaitForAddresses(5*time.Minute, "localhost:9092")). + Step("wait", flow.Sleep(10*time.Second)). + + // OIDC with secret key + Step(app.Run(appIDOIDCSecretKey, fmt.Sprintf(":%d", appPort), + application(appID1, consumerGroupOIDCSecretKey))). + Step(sidecar.Run(sidecarNameOIDCSecretKey, + append(componentRuntimeOptions(), + embedded.WithResourcesPath("./components/auth_oidc_secret_key"), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), + )..., + )). + Step("wait", flow.Sleep(10*time.Second)). + Step("send and wait(in-order)", sendTest(sidecarNameOIDCSecretKey, consumerGroupOIDCSecretKey)). + Step("stop sidecar", sidecar.Stop(sidecarNameOIDCSecretKey)). + Step("stop app", app.Stop(appIDOIDCSecretKey)). + + // OIDC with certificates + Step(app.Run(appIDOIDCCerts, fmt.Sprintf(":%d", appPort), + application(appID1, consumerGroupOIDCCerts))). + Step(sidecar.Run(sidecarNameOIDCCerts, + append(componentRuntimeOptions(), + embedded.WithResourcesPath("./components/auth_oidc_certs"), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), + )..., + )). + Step("wait", flow.Sleep(10*time.Second)). + Step("send and wait(in-order)", sendTest(sidecarNameOIDCCerts, consumerGroupOIDCCerts)). + Step("stop sidecar", sidecar.Stop(sidecarNameOIDCCerts)). + Step("stop app", app.Stop(appIDOIDCCerts)). + Run() +} + func componentRuntimeOptions() []embedded.Option { log := logger.NewLogger("dapr.components") @@ -461,7 +569,12 @@ func componentRuntimeOptions() []embedded.Option { pubsubRegistry.Logger = log pubsubRegistry.RegisterComponent(pubsub_kafka.NewKafka, "kafka") + secretstoreRegistry := secretstores_loader.NewRegistry() + secretstoreRegistry.Logger = log + secretstoreRegistry.RegisterComponent(secretstore_env.NewEnvSecretStore, "local.env") + return []embedded.Option{ embedded.WithPubSubs(pubsubRegistry), + embedded.WithSecretStores(secretstoreRegistry), } }