Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
```

## Configuring a Kafka client to use AWS IAM with AWS_MSK_IAM mechanism with Custom STS Regional endpoints
You can configure a Kafka client to use AWS IAM for authentication by adding the following properties to the client's
configuration. This is wrapper on the IAM Auth library to support Regional Based STS Endpoint for to retrieve temporary assume role credentials.

```properties
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='awsRoleArn' awsRoleSessionName='awsRoleSessionName' awsStsRegion='awsStsRegion' awsStsRegionalEndpoint='awsStsRegionalEndpoint';

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.login.callback.handler.class=software.amazon.msk.auth.iam.STSAssumeRoleIAMClientCallbackHandler
# This is used during client authentication and reauthentication
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.STSAssumeRoleIAMClientCallbackHandler
```

## Configuring a Kafka client to use AWS IAM with SASL OAUTHBEARER mechanism
You can alternatively use SASL/OAUTHBEARER mechanism using IAM authentication by adding following configuration.
For more details on SASL/OAUTHBEARER mechanism, please read - [KIP-255](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876)
Expand Down Expand Up @@ -159,6 +181,18 @@ The Default Credential Provider Chain must contain the permissions necessary to
For example, if the client is an EC2 instance, its instance profile should have permission to assume the
`msk_client_role`.

### Specifying an AWS IAM Role with Custom STS Regional endpoints
The library supports another way to configure a client to assume an IAM role and use the role's temporary credentials. The IAM role's ARN and optionally the session name for the client can be passed in as client configuration property:

sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::123456789012:role/msk_client_role" awsRoleSessionName="producer" awsStsRegion="us-west-2" awsStsRegionalEndpoint="https://vpce-4kujcrex.sts.us-east-1.vpce.amazonaws.com";
In this case, the awsRoleArn specifies the ARN for the IAM role the client should use and awsRoleSessionName specifies the session name that this particular client should use while assuming the IAM role. If the same IAM Role is used in multiple contexts, the session names can be used to differentiate between the different contexts. The awsRoleSessionName is optional.

awsStsRegion specifies the regional endpoint of AWS STS to use while assuming the IAM role. If awsStsRegion is omitted the global endpoint for AWS STS is used by default. When the Kafka client is running in a VPC with an STS interface VPC Endpoint (AWS PrivateLink) to a regional endpoint of AWS STS and we want all STS traffic to go over that endpoint, we should set awsStsRegion to the region corresponding to the interface VPC Endpoint. It also be necessary to configure the awsStsRegionalEndpoint which points to custom regional based VPC endpoints.

The Default Credential Provider Chain must contain the permissions necessary to assume the client role. If the client is an EC2 instance, its instance profile should have permission to assume the msk_client_role and in case of non EC2 instance below environment variables needs to set to work with assume role credentials
AWS_ACCESS_KEY_ID = "This will be permanant access key id should have permission to assume the msk_client_role" ( For security reason we can keep yearly rotation based )
AWS_SECRET_ACCESS_KEY = "This will be permanant secrete key should have permission to assume the msk_client_role" ( For security reason we can keep yearly rotation based )

### Figuring out whether or not to use default credentials

When you want the MSK client to connect to MSK using credentials not found in the [AWS Default Credentials Provider Chain][DefaultCreds], you can specify an `awsProfileName` containing the credential profile to use, or an `awsRoleArn` to indicate an IAM Role’s ARN to assume using credentials in the Default Credential Provider Chain. These parameters are optional, and if they are not set the MSK client will use credentials from the Default Credential Provider Chain. There is no need to specify them if you intend to use an IAM role associated with an AWS compute service, such as EC2 or ECS to authenticate to MSK.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package software.amazon.msk.auth.iam;


import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;
import software.amazon.msk.auth.iam.internals.STSAssumeRoleMSKCredentialProvider;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class STSAssumeRoleIAMClientCallbackHandler extends IAMClientCallbackHandler {
private static final Logger log = LoggerFactory.getLogger(STSAssumeRoleIAMClientCallbackHandler.class);
private AwsCredentialsProvider provider;

@Override
public void configure(Map<String, ?> configs,
@NonNull String saslMechanism,
@NonNull List<AppConfigurationEntry> jaasConfigEntries) {
if (!IAMLoginModule.MECHANISM.equals(saslMechanism)) {
throw new IllegalArgumentException("Unexpected SASL mechanism: " + saslMechanism);
}
final Optional<AppConfigurationEntry> configEntry = jaasConfigEntries.stream()
.filter(j -> IAMLoginModule.class.getCanonicalName().equals(j.getLoginModuleName())).findFirst();
provider = configEntry.map(c -> (AwsCredentialsProvider) new STSAssumeRoleMSKCredentialProvider(c.getOptions()))
.orElse(DefaultCredentialsProvider.create());
log.info("Successfully retrieved Temp Credentials access key, secrete key");
}

@Override
public void close() {
try {
if (provider instanceof AutoCloseable) {
((AutoCloseable) provider).close();
}
} catch (Exception e) {
log.warn("Error closing provider", e);
}
}

@Override
public void handle(@NonNull Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (log.isDebugEnabled()) {
log.debug("Type information for callback: " + debugClassString(callback.getClass()) + " from "
+ debugClassString(this.getClass()));
}
if (callback instanceof AWSCredentialsCallback) {
handleCallback((AWSCredentialsCallback) callback);
} else {
String message = "Unsupported callback type: " + debugClassString(callback.getClass()) + " from "
+ debugClassString(this.getClass());
//We are breaking good practice and logging as well as throwing since this is where client side
//integrations might have trouble. Depending on the client framework either logging or throwing might
//surface the error more easily to the user.
log.error(message);
throw new UnsupportedCallbackException(callback, message);
}
}
}

protected static String debugClassString(Class<?> clazz) {
return "class: " + clazz.getName() + " classloader: " + clazz.getClassLoader().toString();
}

protected void handleCallback(AWSCredentialsCallback callback) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Selecting provider {} to load credentials", provider.getClass().getName());
}

try {
callback.setAwsCredentials(provider.resolveCredentials());
log.info("Credentials are set in the callback handler");
} catch (Exception e) {
callback.setLoadingException(e);
}


}
}
Loading