-
Notifications
You must be signed in to change notification settings - Fork 0
Add IAM secret rotation feature with S3 storage #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughA new script was added to automate AWS IAM user access key rotation and securely store the new credentials in an S3 bucket with server-side encryption and public access blocked. The script provides a command-line interface and returns a 14-day pre-signed URL for credential retrieval. Corresponding unit tests were also introduced. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Script
participant IAM
participant S3
User->>Script: Run script with (optional) old_key_id, user_name
Script->>IAM: Get IAM user info
Script->>IAM: Create new access key
alt old_key_id provided
Script->>IAM: Deactivate old access key
end
Script->>S3: Create secure S3 bucket (if needed)
Script->>S3: Store new credentials as JSON
Script->>S3: Generate pre-signed URL (14 days)
Script->>User: Output pre-signed URL
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~18 minutes Poem
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Reviewer's GuideIntroduces a standalone CLI script that rotates IAM access keys, securely stores the new credentials in an encrypted S3 bucket, and returns a 14-day pre-signed URL, along with a focused unit test covering the core rotation and storage logic. Sequence diagram for IAM secret rotation and S3 storage processsequenceDiagram
actor User
participant CLI as aws_iam_secret_rotation_s3.py
participant IAM as AWS IAM
participant S3 as AWS S3
User->>CLI: Run script with optional old_key_id and user_name
CLI->>IAM: Get current user (if user_name not provided)
CLI->>IAM: Create new access key for user
CLI->>IAM: Deactivate old access key (if old_key_id provided)
CLI->>S3: Create secure S3 bucket
CLI->>S3: Store new credentials as JSON (AES-256 encrypted)
CLI->>S3: Generate pre-signed URL (14 days)
CLI-->>User: Output pre-signed URL
Class diagram for IAM secret rotation script structureclassDiagram
class aws_iam_secret_rotation_s3 {
+main()
+get_iam_client()
+get_s3_client()
+create_secure_bucket(bucket_name: str) str
+rotate_and_store_credentials(old_key_id: Optional[str], user_name: Optional[str]) str
}
aws_iam_secret_rotation_s3 ..> boto3 : uses
aws_iam_secret_rotation_s3 ..> argparse : uses
aws_iam_secret_rotation_s3 ..> logging : uses
aws_iam_secret_rotation_s3 ..> json : uses
aws_iam_secret_rotation_s3 ..> time : uses
aws_iam_secret_rotation_s3 ..> botocore.exceptions.ClientError : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @tblakex01 - I've reviewed your changes - here's some feedback:
- Consider making the bucket name configurable or injecting a UUID component instead of using just a timestamp to avoid global name collisions and to give users the option to reuse an existing bucket.
- Add a lifecycle policy or cleanup mechanism for the created S3 buckets/objects so that old rotated secrets automatically expire and you don’t accumulate stale credentials over time.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider making the bucket name configurable or injecting a UUID component instead of using just a timestamp to avoid global name collisions and to give users the option to reuse an existing bucket.
- Add a lifecycle policy or cleanup mechanism for the created S3 buckets/objects so that old rotated secrets automatically expire and you don’t accumulate stale credentials over time.
## Individual Comments
### Comment 1
<location> `scripts/aws_iam_secret_rotation_s3.py:44` </location>
<code_context>
+ return _s3_client
+
+
+def create_secure_bucket(bucket_name: str) -> str:
+ """Create an S3 bucket with public access blocked and AES-256 encryption."""
+ s3 = get_s3_client()
+ s3.create_bucket(Bucket=bucket_name)
+ s3.put_public_access_block(
+ Bucket=bucket_name,
</code_context>
<issue_to_address>
No region specified for S3 bucket creation may cause issues in some environments.
Consider allowing the region to be specified or explicitly setting it to avoid unintended defaults.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
def create_secure_bucket(bucket_name: str) -> str:
"""Create an S3 bucket with public access blocked and AES-256 encryption."""
s3 = get_s3_client()
s3.create_bucket(Bucket=bucket_name)
s3.put_public_access_block(
=======
def create_secure_bucket(bucket_name: str, region: str = "us-east-1") -> str:
"""
Create an S3 bucket with public access blocked and AES-256 encryption.
Args:
bucket_name: Name of the bucket to create.
region: AWS region to create the bucket in. Defaults to 'us-east-1'.
"""
s3 = get_s3_client()
if region == "us-east-1":
s3.create_bucket(Bucket=bucket_name)
else:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": region},
)
s3.put_public_access_block(
>>>>>>> REPLACE
</suggested_fix>
### Comment 2
<location> `scripts/aws_iam_secret_rotation_s3.py:47` </location>
<code_context>
+def create_secure_bucket(bucket_name: str) -> str:
+ """Create an S3 bucket with public access blocked and AES-256 encryption."""
+ s3 = get_s3_client()
+ s3.create_bucket(Bucket=bucket_name)
+ s3.put_public_access_block(
+ Bucket=bucket_name,
+ PublicAccessBlockConfiguration={
</code_context>
<issue_to_address>
No handling for bucket name collisions or pre-existing buckets.
Handle the exception when create_bucket fails due to an existing bucket, and consider retrying with a new name or providing a clearer error message.
</issue_to_address>
### Comment 3
<location> `scripts/aws_iam_secret_rotation_s3.py:68` </location>
<code_context>
+ return bucket_name
+
+
+def rotate_and_store_credentials(
+ old_key_id: Optional[str] = None, user_name: Optional[str] = None
+) -> str:
+ """Rotate an IAM user's access key and store the new credentials in S3."""
+ iam = get_iam_client()
+ s3 = get_s3_client()
+
+ if user_name is None:
+ user_name = iam.get_user()["User"]["UserName"]
+
</code_context>
<issue_to_address>
Potential risk of exceeding IAM access key quota.
If the user already has two active keys and the old key isn't deactivated, create_access_key will fail. Please check the number of existing keys and handle this case appropriately.
</issue_to_address>
### Comment 4
<location> `scripts/aws_iam_secret_rotation_s3.py:105` </location>
<code_context>
+ ServerSideEncryption="AES256",
+ )
+
+ url = s3.generate_presigned_url(
+ "get_object",
+ Params={"Bucket": bucket_name, "Key": file_key},
+ ExpiresIn=14 * 24 * 60 * 60,
+ )
+ return url
</code_context>
<issue_to_address>
Long pre-signed URL expiration increases exposure risk.
A 14-day expiration is excessive for sensitive pre-signed URLs. Recommend reducing the duration or allowing it to be configurable to minimize risk.
Suggested implementation:
```python
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=presigned_url_expiration,
)
return url
```
```python
def upload_and_generate_presigned_url(
s3, bucket_name, file_key, body, presigned_url_expiration=3600
):
"""
Uploads an object to S3 and generates a pre-signed URL.
Args:
s3: boto3 S3 client
bucket_name: S3 bucket name
file_key: S3 object key
body: file content
presigned_url_expiration: Expiration time in seconds (default: 1 hour)
"""
s3.put_object(
Bucket=bucket_name,
Key=file_key,
Body=body,
ServerSideEncryption="AES256",
)
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=presigned_url_expiration,
)
return url
def main() -> None:
```
```python
parser = argparse.ArgumentParser(
description="Rotate IAM credentials and store the secret in S3"
)
parser.add_argument(
"--presigned-url-expiration",
type=int,
default=3600,
help="Expiration (in seconds) for the pre-signed S3 URL (default: 3600 = 1 hour)",
)
```
- You will need to update the code that calls the S3 upload and pre-signed URL generation to use the new `upload_and_generate_presigned_url` function and pass the `presigned_url_expiration` argument from the CLI.
- If the upload/generation logic is in a function not shown here, update its signature and calls accordingly.
</issue_to_address>
### Comment 5
<location> `tests/test_secret_rotation_s3.py:16` </location>
<code_context>
+import aws_iam_secret_rotation_s3 as rot # noqa: E402
+
+
+class TestRotateAndStore(unittest.TestCase):
+ """Test rotate_and_store_credentials function."""
+
</code_context>
<issue_to_address>
Consider adding test coverage for the CLI entry point (main function).
Add a test that invokes the CLI entry point, either by running the script as a subprocess or by mocking `argparse`, to verify correct argument parsing and output.
Suggested implementation:
```python
import aws_iam_secret_rotation_s3 as rot # noqa: E402
```
```python
import unittest
from unittest.mock import Mock, patch
import io
```
```python
import aws_iam_secret_rotation_s3 as rot # noqa: E402
class TestCLIEntryPoint(unittest.TestCase):
"""Test CLI entry point for aws_iam_secret_rotation_s3."""
@patch("sys.argv", ["aws_iam_secret_rotation_s3.py", "--help"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_main_help(self, mock_stdout):
# Import inside test to ensure patches are in effect
with self.assertRaises(SystemExit) as cm:
import importlib
importlib.reload(rot)
rot.main()
output = mock_stdout.getvalue()
self.assertIn("usage", output.lower())
self.assertEqual(cm.exception.code, 0)
@patch("sys.argv", ["aws_iam_secret_rotation_s3.py", "--some-required-arg", "value"])
@patch("aws_iam_secret_rotation_s3.rotate_and_store_credentials")
@patch("sys.stdout", new_callable=io.StringIO)
def test_main_invocation(self, mock_stdout, mock_rotate,):
# Simulate successful rotation
mock_rotate.return_value = True
import importlib
importlib.reload(rot)
rot.main()
output = mock_stdout.getvalue()
self.assertTrue(mock_rotate.called)
# Optionally check for expected output
# self.assertIn("Success", output)
```
- Adjust the CLI argument names in the test (e.g., `--some-required-arg`) to match the actual required arguments of your CLI.
- If your CLI prints to stderr, patch `sys.stderr` instead of `sys.stdout`.
- If the main function is not named `main`, update the test to call the correct entry point.
- If the CLI is structured differently (e.g., uses `if __name__ == "__main__":`), you may need to import and call the script differently.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def create_secure_bucket(bucket_name: str) -> str: | ||
| """Create an S3 bucket with public access blocked and AES-256 encryption.""" | ||
| s3 = get_s3_client() | ||
| s3.create_bucket(Bucket=bucket_name) | ||
| s3.put_public_access_block( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): No region specified for S3 bucket creation may cause issues in some environments.
Consider allowing the region to be specified or explicitly setting it to avoid unintended defaults.
| def create_secure_bucket(bucket_name: str) -> str: | |
| """Create an S3 bucket with public access blocked and AES-256 encryption.""" | |
| s3 = get_s3_client() | |
| s3.create_bucket(Bucket=bucket_name) | |
| s3.put_public_access_block( | |
| def create_secure_bucket(bucket_name: str, region: str = "us-east-1") -> str: | |
| """ | |
| Create an S3 bucket with public access blocked and AES-256 encryption. | |
| Args: | |
| bucket_name: Name of the bucket to create. | |
| region: AWS region to create the bucket in. Defaults to 'us-east-1'. | |
| """ | |
| s3 = get_s3_client() | |
| if region == "us-east-1": | |
| s3.create_bucket(Bucket=bucket_name) | |
| else: | |
| s3.create_bucket( | |
| Bucket=bucket_name, | |
| CreateBucketConfiguration={"LocationConstraint": region}, | |
| ) | |
| s3.put_public_access_block( |
| s3.create_bucket(Bucket=bucket_name) | ||
| s3.put_public_access_block( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: No handling for bucket name collisions or pre-existing buckets.
Handle the exception when create_bucket fails due to an existing bucket, and consider retrying with a new name or providing a clearer error message.
| def rotate_and_store_credentials( | ||
| old_key_id: Optional[str] = None, user_name: Optional[str] = None | ||
| ) -> str: | ||
| """Rotate an IAM user's access key and store the new credentials in S3.""" | ||
| iam = get_iam_client() | ||
| s3 = get_s3_client() | ||
|
|
||
| if user_name is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Potential risk of exceeding IAM access key quota.
If the user already has two active keys and the old key isn't deactivated, create_access_key will fail. Please check the number of existing keys and handle this case appropriately.
| url = s3.generate_presigned_url( | ||
| "get_object", | ||
| Params={"Bucket": bucket_name, "Key": file_key}, | ||
| ExpiresIn=14 * 24 * 60 * 60, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 suggestion (security): Long pre-signed URL expiration increases exposure risk.
A 14-day expiration is excessive for sensitive pre-signed URLs. Recommend reducing the duration or allowing it to be configurable to minimize risk.
Suggested implementation:
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=presigned_url_expiration,
)
return urldef upload_and_generate_presigned_url(
s3, bucket_name, file_key, body, presigned_url_expiration=3600
):
"""
Uploads an object to S3 and generates a pre-signed URL.
Args:
s3: boto3 S3 client
bucket_name: S3 bucket name
file_key: S3 object key
body: file content
presigned_url_expiration: Expiration time in seconds (default: 1 hour)
"""
s3.put_object(
Bucket=bucket_name,
Key=file_key,
Body=body,
ServerSideEncryption="AES256",
)
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=presigned_url_expiration,
)
return url
def main() -> None: parser = argparse.ArgumentParser(
description="Rotate IAM credentials and store the secret in S3"
)
parser.add_argument(
"--presigned-url-expiration",
type=int,
default=3600,
help="Expiration (in seconds) for the pre-signed S3 URL (default: 3600 = 1 hour)",
)- You will need to update the code that calls the S3 upload and pre-signed URL generation to use the new
upload_and_generate_presigned_urlfunction and pass thepresigned_url_expirationargument from the CLI. - If the upload/generation logic is in a function not shown here, update its signature and calls accordingly.
| import aws_iam_secret_rotation_s3 as rot # noqa: E402 | ||
|
|
||
|
|
||
| class TestRotateAndStore(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider adding test coverage for the CLI entry point (main function).
Add a test that invokes the CLI entry point, either by running the script as a subprocess or by mocking argparse, to verify correct argument parsing and output.
Suggested implementation:
import aws_iam_secret_rotation_s3 as rot # noqa: E402import unittest
from unittest.mock import Mock, patch
import ioimport aws_iam_secret_rotation_s3 as rot # noqa: E402
class TestCLIEntryPoint(unittest.TestCase):
"""Test CLI entry point for aws_iam_secret_rotation_s3."""
@patch("sys.argv", ["aws_iam_secret_rotation_s3.py", "--help"])
@patch("sys.stdout", new_callable=io.StringIO)
def test_main_help(self, mock_stdout):
# Import inside test to ensure patches are in effect
with self.assertRaises(SystemExit) as cm:
import importlib
importlib.reload(rot)
rot.main()
output = mock_stdout.getvalue()
self.assertIn("usage", output.lower())
self.assertEqual(cm.exception.code, 0)
@patch("sys.argv", ["aws_iam_secret_rotation_s3.py", "--some-required-arg", "value"])
@patch("aws_iam_secret_rotation_s3.rotate_and_store_credentials")
@patch("sys.stdout", new_callable=io.StringIO)
def test_main_invocation(self, mock_stdout, mock_rotate,):
# Simulate successful rotation
mock_rotate.return_value = True
import importlib
importlib.reload(rot)
rot.main()
output = mock_stdout.getvalue()
self.assertTrue(mock_rotate.called)
# Optionally check for expected output
# self.assertIn("Success", output)- Adjust the CLI argument names in the test (e.g.,
--some-required-arg) to match the actual required arguments of your CLI. - If your CLI prints to stderr, patch
sys.stderrinstead ofsys.stdout. - If the main function is not named
main, update the test to call the correct entry point. - If the CLI is structured differently (e.g., uses
if __name__ == "__main__":), you may need to import and call the script differently.
| url = s3.generate_presigned_url( | ||
| "get_object", | ||
| Params={"Bucket": bucket_name, "Key": file_key}, | ||
| ExpiresIn=14 * 24 * 60 * 60, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
tests/test_secret_rotation_s3.py (1)
7-13: Consider using PYTHONPATH or proper packaging for imports.The path manipulation works but is fragile. Consider using
PYTHONPATHenvironment variable or proper Python packaging withsetup.pyfor more robust imports.Alternative approach:
# Instead of sys.path manipulation, use: # export PYTHONPATH="${PYTHONPATH}:$(pwd)/scripts" # Or create a setup.py with proper package structure
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
scripts/aws_iam_secret_rotation_s3.py(1 hunks)tests/test_secret_rotation_s3.py(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's `secrets` module for cryptographic security.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Never commit AWS credentials.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/**/*.py : All scripts use boto3 for AWS API interactions.
tests/test_secret_rotation_s3.py (2)
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's secrets module for cryptographic security.
scripts/aws_iam_secret_rotation_s3.py (3)
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's secrets module for cryptographic security.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/**/*.py : All scripts use boto3 for AWS API interactions.
🔇 Additional comments (4)
scripts/aws_iam_secret_rotation_s3.py (3)
1-26: LGTM! Clean imports and setup.The imports are well-organized, the docstring is comprehensive, and the lazy client initialization pattern using global variables is appropriate for this use case.
28-42: LGTM! Efficient lazy initialization pattern.The client factory functions implement lazy initialization correctly, reducing unnecessary AWS client creation. Note that this implementation is not thread-safe, but that's likely acceptable for this script's intended usage.
113-129: LGTM! Clean CLI implementation.The argument parsing is well-structured with clear descriptions, and the main function properly orchestrates the workflow. The output format is user-friendly.
tests/test_secret_rotation_s3.py (1)
48-49: LGTM! Standard test execution pattern.The main block allows direct execution of tests, following Python testing conventions.
| def create_secure_bucket(bucket_name: str) -> str: | ||
| """Create an S3 bucket with public access blocked and AES-256 encryption.""" | ||
| s3 = get_s3_client() | ||
| s3.create_bucket(Bucket=bucket_name) | ||
| s3.put_public_access_block( | ||
| Bucket=bucket_name, | ||
| PublicAccessBlockConfiguration={ | ||
| "BlockPublicAcls": True, | ||
| "IgnorePublicAcls": True, | ||
| "BlockPublicPolicy": True, | ||
| "RestrictPublicBuckets": True, | ||
| }, | ||
| ) | ||
| s3.put_bucket_encryption( | ||
| Bucket=bucket_name, | ||
| ServerSideEncryptionConfiguration={ | ||
| "Rules": [ | ||
| {"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}} | ||
| ] | ||
| }, | ||
| ) | ||
| return bucket_name | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling and region considerations.
The security configurations are excellent, but consider these improvements:
- Error handling: Bucket creation can fail due to name conflicts or permissions
- Region handling: Bucket creation in
us-east-1doesn't requireCreateBucketConfiguration, but other regions do - Bucket naming: Consider validating bucket name format
def create_secure_bucket(bucket_name: str) -> str:
"""Create an S3 bucket with public access blocked and AES-256 encryption."""
s3 = get_s3_client()
- s3.create_bucket(Bucket=bucket_name)
+ try:
+ # Handle region-specific bucket creation
+ region = s3.meta.region_name
+ if region == 'us-east-1':
+ s3.create_bucket(Bucket=bucket_name)
+ else:
+ s3.create_bucket(
+ Bucket=bucket_name,
+ CreateBucketConfiguration={'LocationConstraint': region}
+ )
+ except ClientError as e:
+ if e.response['Error']['Code'] != 'BucketAlreadyOwnedByYou':
+ raise
+ logger.info("Bucket %s already exists and is owned by you", bucket_name)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def create_secure_bucket(bucket_name: str) -> str: | |
| """Create an S3 bucket with public access blocked and AES-256 encryption.""" | |
| s3 = get_s3_client() | |
| s3.create_bucket(Bucket=bucket_name) | |
| s3.put_public_access_block( | |
| Bucket=bucket_name, | |
| PublicAccessBlockConfiguration={ | |
| "BlockPublicAcls": True, | |
| "IgnorePublicAcls": True, | |
| "BlockPublicPolicy": True, | |
| "RestrictPublicBuckets": True, | |
| }, | |
| ) | |
| s3.put_bucket_encryption( | |
| Bucket=bucket_name, | |
| ServerSideEncryptionConfiguration={ | |
| "Rules": [ | |
| {"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}} | |
| ] | |
| }, | |
| ) | |
| return bucket_name | |
| def create_secure_bucket(bucket_name: str) -> str: | |
| """Create an S3 bucket with public access blocked and AES-256 encryption.""" | |
| s3 = get_s3_client() | |
| try: | |
| # Handle region-specific bucket creation | |
| region = s3.meta.region_name | |
| if region == 'us-east-1': | |
| s3.create_bucket(Bucket=bucket_name) | |
| else: | |
| s3.create_bucket( | |
| Bucket=bucket_name, | |
| CreateBucketConfiguration={'LocationConstraint': region} | |
| ) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] != 'BucketAlreadyOwnedByYou': | |
| raise | |
| logger.info("Bucket %s already exists and is owned by you", bucket_name) | |
| s3.put_public_access_block( | |
| Bucket=bucket_name, | |
| PublicAccessBlockConfiguration={ | |
| "BlockPublicAcls": True, | |
| "IgnorePublicAcls": True, | |
| "BlockPublicPolicy": True, | |
| "RestrictPublicBuckets": True, | |
| }, | |
| ) | |
| s3.put_bucket_encryption( | |
| Bucket=bucket_name, | |
| ServerSideEncryptionConfiguration={ | |
| "Rules": [ | |
| {"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}} | |
| ] | |
| }, | |
| ) | |
| return bucket_name |
🤖 Prompt for AI Agents
In scripts/aws_iam_secret_rotation_s3.py around lines 44 to 66, improve the
create_secure_bucket function by adding error handling to catch exceptions from
bucket creation failures such as name conflicts or permission issues. Modify the
bucket creation call to include CreateBucketConfiguration with the region
parameter for regions other than us-east-1. Also, add validation logic before
bucket creation to ensure the bucket name conforms to AWS naming rules,
returning or raising an error if invalid.
| def rotate_and_store_credentials( | ||
| old_key_id: Optional[str] = None, user_name: Optional[str] = None | ||
| ) -> str: | ||
| """Rotate an IAM user's access key and store the new credentials in S3.""" | ||
| iam = get_iam_client() | ||
| s3 = get_s3_client() | ||
|
|
||
| if user_name is None: | ||
| user_name = iam.get_user()["User"]["UserName"] | ||
|
|
||
| response = iam.create_access_key(UserName=user_name) | ||
| new_key = response["AccessKey"] | ||
|
|
||
| if old_key_id: | ||
| try: | ||
| iam.update_access_key( | ||
| UserName=user_name, AccessKeyId=old_key_id, Status="Inactive" | ||
| ) | ||
| except ClientError as exc: # pragma: no cover - safety net | ||
| logger.error("Failed to deactivate old key: %s", exc) | ||
|
|
||
| bucket_name = create_secure_bucket(f"iam-key-backup-{int(time.time())}") | ||
| file_key = f"{new_key['AccessKeyId']}.json" | ||
| body = json.dumps( | ||
| { | ||
| "AccessKeyId": new_key["AccessKeyId"], | ||
| "SecretAccessKey": new_key["SecretAccessKey"], | ||
| } | ||
| ) | ||
|
|
||
| s3.put_object( | ||
| Bucket=bucket_name, | ||
| Key=file_key, | ||
| Body=body, | ||
| ServerSideEncryption="AES256", | ||
| ) | ||
|
|
||
| url = s3.generate_presigned_url( | ||
| "get_object", | ||
| Params={"Bucket": bucket_name, "Key": file_key}, | ||
| ExpiresIn=14 * 24 * 60 * 60, | ||
| ) | ||
| return url | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider encryption for stored credentials and improve error handling.
The core logic is sound and follows AWS best practices, but there are some considerations:
- Credential storage: Storing credentials in plain JSON, even in an encrypted bucket, may not meet security requirements
- Error handling: Limited error handling for S3 operations
- Resource cleanup: No cleanup mechanism if operations fail partway through
Consider encrypting the credentials before storage:
+import base64
+from cryptography.fernet import Fernet
def rotate_and_store_credentials(
old_key_id: Optional[str] = None, user_name: Optional[str] = None
) -> str:
"""Rotate an IAM user's access key and store the new credentials in S3."""
iam = get_iam_client()
s3 = get_s3_client()
if user_name is None:
user_name = iam.get_user()["User"]["UserName"]
- response = iam.create_access_key(UserName=user_name)
- new_key = response["AccessKey"]
+ try:
+ response = iam.create_access_key(UserName=user_name)
+ new_key = response["AccessKey"]
+ except ClientError as e:
+ logger.error("Failed to create new access key: %s", e)
+ raiseAlso consider using AWS KMS for additional encryption or implementing client-side encryption for the JSON payload.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def rotate_and_store_credentials( | |
| old_key_id: Optional[str] = None, user_name: Optional[str] = None | |
| ) -> str: | |
| """Rotate an IAM user's access key and store the new credentials in S3.""" | |
| iam = get_iam_client() | |
| s3 = get_s3_client() | |
| if user_name is None: | |
| user_name = iam.get_user()["User"]["UserName"] | |
| response = iam.create_access_key(UserName=user_name) | |
| new_key = response["AccessKey"] | |
| if old_key_id: | |
| try: | |
| iam.update_access_key( | |
| UserName=user_name, AccessKeyId=old_key_id, Status="Inactive" | |
| ) | |
| except ClientError as exc: # pragma: no cover - safety net | |
| logger.error("Failed to deactivate old key: %s", exc) | |
| bucket_name = create_secure_bucket(f"iam-key-backup-{int(time.time())}") | |
| file_key = f"{new_key['AccessKeyId']}.json" | |
| body = json.dumps( | |
| { | |
| "AccessKeyId": new_key["AccessKeyId"], | |
| "SecretAccessKey": new_key["SecretAccessKey"], | |
| } | |
| ) | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=file_key, | |
| Body=body, | |
| ServerSideEncryption="AES256", | |
| ) | |
| url = s3.generate_presigned_url( | |
| "get_object", | |
| Params={"Bucket": bucket_name, "Key": file_key}, | |
| ExpiresIn=14 * 24 * 60 * 60, | |
| ) | |
| return url | |
| +import base64 | |
| +from cryptography.fernet import Fernet | |
| def rotate_and_store_credentials( | |
| old_key_id: Optional[str] = None, user_name: Optional[str] = None | |
| ) -> str: | |
| """Rotate an IAM user's access key and store the new credentials in S3.""" | |
| iam = get_iam_client() | |
| s3 = get_s3_client() | |
| if user_name is None: | |
| user_name = iam.get_user()["User"]["UserName"] | |
| - response = iam.create_access_key(UserName=user_name) | |
| - new_key = response["AccessKey"] | |
| + try: | |
| + response = iam.create_access_key(UserName=user_name) | |
| + new_key = response["AccessKey"] | |
| + except ClientError as e: | |
| + logger.error("Failed to create new access key: %s", e) | |
| + raise | |
| if old_key_id: | |
| try: | |
| iam.update_access_key( | |
| UserName=user_name, AccessKeyId=old_key_id, Status="Inactive" | |
| ) | |
| except ClientError as exc: # pragma: no cover - safety net | |
| logger.error("Failed to deactivate old key: %s", exc) | |
| bucket_name = create_secure_bucket(f"iam-key-backup-{int(time.time())}") | |
| file_key = f"{new_key['AccessKeyId']}.json" | |
| body = json.dumps( | |
| { | |
| "AccessKeyId": new_key["AccessKeyId"], | |
| "SecretAccessKey": new_key["SecretAccessKey"], | |
| } | |
| ) | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=file_key, | |
| Body=body, | |
| ServerSideEncryption="AES256", | |
| ) | |
| url = s3.generate_presigned_url( | |
| "get_object", | |
| Params={"Bucket": bucket_name, "Key": file_key}, | |
| ExpiresIn=14 * 24 * 60 * 60, | |
| ) | |
| return url |
🤖 Prompt for AI Agents
In scripts/aws_iam_secret_rotation_s3.py between lines 68 and 111, the
credentials are stored as plain JSON in S3 with only server-side encryption,
which may not meet security standards, and there is minimal error handling for
S3 operations and no cleanup on failure. To fix this, implement client-side
encryption of the JSON payload before uploading to S3, for example by using AWS
KMS to encrypt the credentials locally, then store the encrypted data.
Additionally, add try-except blocks around S3 put_object and presigned URL
generation to handle errors gracefully, and ensure any created resources like
buckets or keys are cleaned up if subsequent steps fail to avoid orphaned
resources.
| class TestRotateAndStore(unittest.TestCase): | ||
| """Test rotate_and_store_credentials function.""" | ||
|
|
||
| @patch("aws_iam_secret_rotation_s3.create_secure_bucket") | ||
| @patch("aws_iam_secret_rotation_s3.get_s3_client") | ||
| @patch("aws_iam_secret_rotation_s3.get_iam_client") | ||
| def test_rotate_and_store_success(self, mock_get_iam, mock_get_s3, mock_bucket): | ||
| iam_client = Mock() | ||
| s3_client = Mock() | ||
| mock_get_iam.return_value = iam_client | ||
| mock_get_s3.return_value = s3_client | ||
| mock_bucket.return_value = "test-bucket" | ||
|
|
||
| iam_client.get_user.return_value = {"User": {"UserName": "test"}} | ||
| iam_client.create_access_key.return_value = { | ||
| "AccessKey": { | ||
| "AccessKeyId": "AKIATEST", | ||
| "SecretAccessKey": "secret", | ||
| } | ||
| } | ||
| s3_client.generate_presigned_url.return_value = "https://example.com/url" | ||
|
|
||
| url = rot.rotate_and_store_credentials("OLDKEY") | ||
|
|
||
| iam_client.update_access_key.assert_called_once_with( | ||
| UserName="test", AccessKeyId="OLDKEY", Status="Inactive" | ||
| ) | ||
| s3_client.put_object.assert_called_once() | ||
| mock_bucket.assert_called_once() | ||
| self.assertEqual(url, "https://example.com/url") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Expand test coverage for edge cases and error scenarios.
The current test only covers the happy path. Consider adding tests for:
- Error scenarios: IAM/S3 client failures
- Edge cases: No old key provided, explicit username
- Validation: Proper JSON structure in S3 object
Add these test methods:
def test_rotate_without_old_key(self, mock_get_iam, mock_get_s3, mock_bucket):
# Test rotation without deactivating old key
pass
def test_rotate_with_explicit_username(self, mock_get_iam, mock_get_s3, mock_bucket):
# Test with provided username instead of getting current user
pass
def test_rotate_iam_error(self, mock_get_iam, mock_get_s3, mock_bucket):
# Test IAM client error handling
pass🤖 Prompt for AI Agents
In tests/test_secret_rotation_s3.py around lines 16 to 46, the existing test
only covers the successful rotation scenario. To improve coverage, add new test
methods to handle edge cases and error scenarios: one to test rotation without
providing an old key (ensuring no deactivation call is made), one to test
rotation with an explicit username parameter (mocking get_user accordingly), and
one to simulate IAM client errors (mocking methods to raise exceptions and
asserting proper error handling). Also, add assertions to validate the JSON
structure of the object stored in S3 in these tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all GitHub Actions workflow files
echo "=== GitHub Actions workflow files ==="
fd -e yml -e yaml .github/workflows
echo -e "\n=== Searching for actions/upload-artifact usage ==="
rg "actions/upload-artifact" .github/workflows/ -A 2 -B 2
echo -e "\n=== Looking for specific v3 usage ==="
rg "actions/upload-artifact@v3" .github/workflows/
Length of output: 1128
@bspeagle The job failure is caused by using a deprecated version of the actions/upload-artifact action in your GitHub Actions workflow.
The Issue:
Your .github/workflows/test.yml file is using actions/upload-artifact@v3, which GitHub deprecated in April 2024. This version will stop working, causing workflow failures.
The Fix:
Update the action from v3 to v4 in your workflow file. Here's what you need to change:
# Current (deprecated):
- name: Upload security scan results
uses: actions/upload-artifact@v3
if: always()
with:
# ... your configuration
# Updated (recommended):
- name: Upload security scan results
uses: actions/upload-artifact@v4
if: always()
with:
# ... your configurationKey Changes in v4:
- Improved performance and reliability
- Better artifact management
- Enhanced security features
Simply update the version number from @v3 to @v4 in your workflow file, and the job should run successfully again.
🏁 Script executed:
#!/bin/bash
# Find all workflow files correctly
echo "=== GitHub Actions workflow files ==="
fd . .github/workflows
echo -e "\n=== Full content of test.yml ==="
cat .github/workflows/test.yml
echo -e "\n=== Context around the upload-artifact usage ==="
rg "upload-artifact" .github/workflows/test.yml -A 5 -B 5
Length of output: 5985
@bspeagle The job failure is caused by using the deprecated actions/upload-artifact@v3 action in your GitHub Actions workflow.
The Issue:
In your .github/workflows/test.yml file, line 120 uses the deprecated version:
- name: Upload security scan results
uses: actions/upload-artifact@v3 # ← This is deprecated
if: always()
with:
name: security-scan-results
path: bandit-report.jsonThe Fix:
Simply update line 120 to use v4:
- name: Upload security scan results
uses: actions/upload-artifact@v4 # ← Change v3 to v4
if: always()
with:
name: security-scan-results
path: bandit-report.jsonWhat This Does:
This step uploads your Bandit security scan results as an artifact so you can download and review them from the GitHub Actions run page, even if the scan finds issues (thanks to if: always()).
The v4 action provides the same functionality with improved performance and security. No other changes to your workflow configuration are needed.
🧠 Learnings used
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's `secrets` module for cryptographic security.
|
Redundant PR from the previous. Will update feature enhancement and create new PR. |
Summary
aws_iam_secret_rotation_s3.pyto rotate IAM secrets and store them securely in S3Testing
black .flake8 scripts/ lambda/ tests/ --max-line-length=120 --ignore=E501,W503,E203mypy --ignore-missing-imports scripts/ lambda/bandit -r scripts/ lambda/pytestpytest tests/integrationterraform init -backend=falseterraform validateterraform fmt -recursivetflinttfsec .terraform planhttps://chatgpt.com/codex/tasks/task_e_6877e073f29083329f9bffea9fdd1d35
Summary by Sourcery
Add a new CLI script to rotate AWS IAM access keys and securely store the new credentials in an encrypted S3 bucket with a 14-day presigned URL, and include unit tests for the rotation and storage logic.
New Features:
Tests:
Summary by CodeRabbit