-
Notifications
You must be signed in to change notification settings - Fork 0
Add IAM credential rotation script #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideIntroduces a Python script that automates IAM access key rotation by creating new keys, disabling old ones, storing the credentials encrypted in a secured S3 bucket, and returning a 14-day presigned URL, along with unit tests validating the core rotation and storage logic. Sequence diagram for IAM credential rotation and secure storagesequenceDiagram
actor Admin
participant Script as aws_iam_secret_key_rotation_s3.py
participant IAM as AWS IAM
participant S3 as AWS S3
Admin->>Script: Run script with IAM username
Script->>IAM: list_access_keys(UserName)
IAM-->>Script: Return existing keys
Script->>IAM: create_access_key(UserName)
IAM-->>Script: Return new access key
loop For each old key
Script->>IAM: update_access_key(Status=Inactive)
end
Script->>S3: create_bucket (secure config)
Script->>S3: put_object (credentials, AES-256)
Script->>S3: generate_presigned_url (14 days)
Script-->>Admin: Print presigned URL
Class diagram for IAM credential rotation script structureclassDiagram
class aws_iam_secret_key_rotation_s3 {
+get_iam_client()
+get_s3_client()
+create_secure_bucket(bucket_name)
+store_credentials(bucket, key, credentials)
+generate_presigned_url(bucket, key)
+rotate_and_store_credentials(user_name)
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
WalkthroughA new Python script has been added to automate AWS IAM access key rotation for a user, securely storing new credentials in a newly created S3 bucket and generating a pre-signed download URL. Corresponding unit tests using mocks have also been introduced to verify the key rotation and storage workflow. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Script
participant IAM
participant S3
User->>Script: Run script with IAM username
Script->>IAM: List existing access keys
Script->>IAM: Create new access key
Script->>IAM: Disable old access keys
Script->>S3: Create secure S3 bucket
Script->>S3: Store new credentials (AES-256 encryption)
Script->>S3: Generate pre-signed URL (14 days)
Script->>User: Output pre-signed URL
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @tblakex01 - I've reviewed your changes - here's some feedback:
- Rather than only disabling old access keys, consider deleting them after successful verification to avoid accumulating inactive keys.
- Creating a new S3 bucket on each rotation can lead to resource sprawl; consider reusing or cleaning up buckets or allowing a configurable bucket prefix.
- Consider parameterizing the expiration time for the pre-signed URL and the bucket name prefix to make the script more flexible.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Rather than only disabling old access keys, consider deleting them after successful verification to avoid accumulating inactive keys.
- Creating a new S3 bucket on each rotation can lead to resource sprawl; consider reusing or cleaning up buckets or allowing a configurable bucket prefix.
- Consider parameterizing the expiration time for the pre-signed URL and the bucket name prefix to make the script more flexible.
## Individual Comments
### Comment 1
<location> `scripts/aws_iam_secret_key_rotation_s3.py:41` </location>
<code_context>
+ return _s3_client
+
+
+def create_secure_bucket(bucket_name: str) -> None:
+ """Create an S3 bucket with public access blocked."""
+ s3 = get_s3_client()
+ s3.create_bucket(Bucket=bucket_name)
+ s3.put_public_access_block(
+ Bucket=bucket_name,
</code_context>
<issue_to_address>
No region specified for S3 bucket creation may cause issues in some environments.
Consider allowing the region to be specified or explicitly setting it when creating the bucket to avoid unintended defaults.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
def create_secure_bucket(bucket_name: str) -> None:
"""Create an S3 bucket with public access blocked."""
s3 = get_s3_client()
s3.create_bucket(Bucket=bucket_name)
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
=======
def create_secure_bucket(bucket_name: str, region: str = "us-east-1") -> None:
"""Create an S3 bucket with public access blocked. Allows specifying region."""
s3 = get_s3_client()
if region == "us-east-1":
s3.create_bucket(Bucket=bucket_name)
else:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": region},
)
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
>>>>>>> REPLACE
</suggested_fix>
### Comment 2
<location> `tests/test_secret_key_rotation_s3.py:48` </location>
<code_context>
+ bucket = "iam-creds-abc123"
+ s3.create_bucket.assert_called_once_with(Bucket=bucket)
+ s3.put_public_access_block.assert_called_once()
+ s3.put_object.assert_called_once_with(
+ Bucket=bucket,
+ Key="NEW.json",
</code_context>
<issue_to_address>
No assertion for the contents of the stored credentials.
Please add an assertion to verify the contents of the Body parameter in put_object, ensuring the stored credentials JSON has the correct structure and values.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
s3.put_object.assert_called_once_with(
Bucket=bucket,
Key="NEW.json",
Body=ANY,
ServerSideEncryption="AES256",
)
=======
s3.put_object.assert_called_once_with(
Bucket=bucket,
Key="NEW.json",
Body=ANY,
ServerSideEncryption="AES256",
)
# Assert the contents of the stored credentials
put_object_call = s3.put_object.call_args
body = put_object_call.kwargs.get("Body") if hasattr(put_object_call, "kwargs") else put_object_call[1]["Body"]
import json
creds = json.loads(body)
self.assertIn("AccessKeyId", creds)
self.assertIn("SecretAccessKey", creds)
self.assertEqual(creds["AccessKeyId"], "NEW")
self.assertEqual(creds["SecretAccessKey"], "SECRET")
>>>>>>> REPLACE
</suggested_fix>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def create_secure_bucket(bucket_name: str) -> None: | ||
| """Create an S3 bucket with public access blocked.""" | ||
| s3 = get_s3_client() | ||
| s3.create_bucket(Bucket=bucket_name) | ||
| s3.put_public_access_block( | ||
| Bucket=bucket_name, | ||
| PublicAccessBlockConfiguration={ | ||
| "BlockPublicAcls": True, | ||
| "IgnorePublicAcls": True, | ||
| "BlockPublicPolicy": True, | ||
| "RestrictPublicBuckets": True, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): No region specified for S3 bucket creation may cause issues in some environments.
Consider allowing the region to be specified or explicitly setting it when creating the bucket to avoid unintended defaults.
| def create_secure_bucket(bucket_name: str) -> None: | |
| """Create an S3 bucket with public access blocked.""" | |
| s3 = get_s3_client() | |
| s3.create_bucket(Bucket=bucket_name) | |
| s3.put_public_access_block( | |
| Bucket=bucket_name, | |
| PublicAccessBlockConfiguration={ | |
| "BlockPublicAcls": True, | |
| "IgnorePublicAcls": True, | |
| "BlockPublicPolicy": True, | |
| "RestrictPublicBuckets": True, | |
| }, | |
| ) | |
| def create_secure_bucket(bucket_name: str, region: str = "us-east-1") -> None: | |
| """Create an S3 bucket with public access blocked. Allows specifying region.""" | |
| s3 = get_s3_client() | |
| if region == "us-east-1": | |
| s3.create_bucket(Bucket=bucket_name) | |
| else: | |
| s3.create_bucket( | |
| Bucket=bucket_name, | |
| CreateBucketConfiguration={"LocationConstraint": region}, | |
| ) | |
| s3.put_public_access_block( | |
| Bucket=bucket_name, | |
| PublicAccessBlockConfiguration={ | |
| "BlockPublicAcls": True, | |
| "IgnorePublicAcls": True, | |
| "BlockPublicPolicy": True, | |
| "RestrictPublicBuckets": True, | |
| }, | |
| ) |
| s3.put_object.assert_called_once_with( | ||
| Bucket=bucket, | ||
| Key="NEW.json", | ||
| Body=ANY, | ||
| ServerSideEncryption="AES256", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): No assertion for the contents of the stored credentials.
Please add an assertion to verify the contents of the Body parameter in put_object, ensuring the stored credentials JSON has the correct structure and values.
| s3.put_object.assert_called_once_with( | |
| Bucket=bucket, | |
| Key="NEW.json", | |
| Body=ANY, | |
| ServerSideEncryption="AES256", | |
| ) | |
| s3.put_object.assert_called_once_with( | |
| Bucket=bucket, | |
| Key="NEW.json", | |
| Body=ANY, | |
| ServerSideEncryption="AES256", | |
| ) | |
| # Assert the contents of the stored credentials | |
| put_object_call = s3.put_object.call_args | |
| body = put_object_call.kwargs.get("Body") if hasattr(put_object_call, "kwargs") else put_object_call[1]["Body"] | |
| import json | |
| creds = json.loads(body) | |
| self.assertIn("AccessKeyId", creds) | |
| self.assertIn("SecretAccessKey", creds) | |
| self.assertEqual(creds["AccessKeyId"], "NEW") | |
| self.assertEqual(creds["SecretAccessKey"], "SECRET") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
tests/test_secret_key_rotation_s3.py (1)
48-53: Consider verifying the actual credential content.The test uses
ANYfor the Body parameter. Consider verifying the actual JSON content to ensure credentials are stored correctly.Replace the
ANYassertion with actual content verification:+ import json + s3.put_object.assert_called_once_with( Bucket=bucket, Key="NEW.json", - Body=ANY, + Body=json.dumps({ + "UserName": "test", + "AccessKeyId": "NEW", + "SecretAccessKey": "SECRET", + "CreateDate": "2024-01-01T00:00:00" + }).encode(), ServerSideEncryption="AES256", )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
scripts/aws_iam_secret_key_rotation_s3.py(1 hunks)tests/test_secret_key_rotation_s3.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
scripts/**/*.py
Instructions used from:
Sources:
📄 CodeRabbit Inference Engine
- CLAUDE.md
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's `secrets` module for cryptographic security.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/**/*.py : All scripts use boto3 for AWS API interactions.
tests/test_secret_key_rotation_s3.py (2)
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's `secrets` module for cryptographic security.
scripts/aws_iam_secret_key_rotation_s3.py (4)
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's `secrets` module for cryptographic security.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/**/*.py : All scripts use boto3 for AWS API interactions.
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to lambda/password_notification/password_notification.py : Lambda function must have an IAM role with 'iam:GenerateCredentialReport', 'iam:GetCredentialReport', 'iam:ListUserTags', and 'ses:SendEmail' permissions.
🧬 Code Graph Analysis (1)
tests/test_secret_key_rotation_s3.py (1)
scripts/aws_iam_secret_key_rotation_s3.py (2)
generate_presigned_url(68-75)rotate_and_store_credentials(78-116)
🔇 Additional comments (18)
scripts/aws_iam_secret_key_rotation_s3.py (11)
1-9: Well-structured module with clear documentation.The shebang, docstring, and module structure follow Python best practices. The docstring clearly explains the script's purpose and behavior.
11-19: Good imports and logging setup.The imports are properly organized and the logging configuration is appropriate for a CLI script.
21-30: Client caching implementation is correct.The global client caching pattern is appropriate for this script and follows AWS SDK best practices for reusing clients.
33-38: Consistent client caching pattern.The S3 client caching follows the same pattern as the IAM client, maintaining consistency.
41-53: Secure bucket creation with proper public access blocking.The function correctly creates a bucket with all public access blocked, which is essential for storing sensitive credentials securely.
56-65: Proper encryption configuration for credential storage.The function correctly uses AES-256 server-side encryption for storing sensitive credentials. The JSON encoding approach is appropriate.
68-75: Appropriate presigned URL generation.The 14-day expiration period is reasonable for credential rotation workflows, and the function correctly generates a presigned URL.
78-96: Secure key rotation logic following AWS best practices.The function correctly follows AWS IAM key rotation best practices: list existing keys, create new key, then disable old keys. This ensures no service interruption.
98-111: Proper credential structure and secure storage.The credential structure includes all necessary fields, and the
isoformat()call ensures proper date serialization. The storage workflow is secure.
113-116: Good logging and output practices.The function appropriately logs the URL generation and prints to stdout for programmatic use.
119-131: Proper CLI interface and error handling.The argparse setup is appropriate, and the error handling correctly catches and logs AWS ClientError exceptions while re-raising them.
tests/test_secret_key_rotation_s3.py (7)
1-6: Proper test file structure and imports.The test file follows good practices with appropriate shebang, docstring, and unittest imports.
8-14: Correct path manipulation for module import.The sys.path manipulation correctly allows importing the script module, and the noqa comment appropriately suppresses the E402 import error.
17-18: Well-structured test class with clear purpose.The test class name and docstring clearly indicate its purpose.
20-28: Comprehensive mocking strategy.The patch decorators correctly mock all external dependencies (uuid4, S3 client, IAM client), ensuring isolated testing.
30-40: Realistic mock data setup.The mock return values realistically simulate AWS API responses with appropriate data structures.
42-56: Thorough assertion coverage.The test verifies all key aspects: URL return value, bucket creation, public access blocking, object upload with encryption, and key deactivation.
59-60: Proper test runner setup.The main block correctly sets up the test runner for standalone execution.
| def create_secure_bucket(bucket_name: str) -> None: | ||
| """Create an S3 bucket with public access blocked.""" | ||
| s3 = get_s3_client() | ||
| s3.create_bucket(Bucket=bucket_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add region handling for S3 bucket creation.
The create_bucket call may fail in regions other than us-east-1 due to missing CreateBucketConfiguration. Consider adding region-specific configuration.
Apply this diff to handle different regions:
def create_secure_bucket(bucket_name: str) -> None:
"""Create an S3 bucket with public access blocked."""
s3 = get_s3_client()
- s3.create_bucket(Bucket=bucket_name)
+ region = s3._client_config.region_name
+ if region == 'us-east-1':
+ s3.create_bucket(Bucket=bucket_name)
+ else:
+ s3.create_bucket(
+ Bucket=bucket_name,
+ CreateBucketConfiguration={'LocationConstraint': region}
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| s3.create_bucket(Bucket=bucket_name) | |
| def create_secure_bucket(bucket_name: str) -> None: | |
| """Create an S3 bucket with public access blocked.""" | |
| s3 = get_s3_client() | |
| region = s3._client_config.region_name | |
| if region == 'us-east-1': | |
| s3.create_bucket(Bucket=bucket_name) | |
| else: | |
| s3.create_bucket( | |
| Bucket=bucket_name, | |
| CreateBucketConfiguration={'LocationConstraint': region} | |
| ) |
🤖 Prompt for AI Agents
In scripts/aws_iam_secret_key_rotation_s3.py at line 44, the S3 bucket creation
does not specify the region, which can cause failures outside the us-east-1
region. Modify the create_bucket call to include a CreateBucketConfiguration
parameter with the LocationConstraint set to the target region, but only if the
region is not us-east-1. This ensures compatibility with all AWS regions.
| for meta in existing: | ||
| if meta["AccessKeyId"] != new_key["AccessKeyId"]: | ||
| iam.update_access_key( | ||
| UserName=user_name, | ||
| AccessKeyId=meta["AccessKeyId"], | ||
| Status="Inactive", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify IAM user can have multiple access keys.
The script assumes existing keys should be disabled after creating a new one. Ensure this aligns with the IAM user's key limit (max 2 keys per user).
Run the following script to verify IAM access key limits and behavior:
🏁 Script executed:
#!/bin/bash
# Description: Verify IAM access key limits and rotation behavior
# Expected: IAM users can have maximum 2 access keys
# Check AWS documentation for IAM access key limits
echo "Checking IAM access key limits..."
aws iam get-account-summary --query 'SummaryMap.AccessKeysPerUserQuota' --output text 2>/dev/null || echo "IAM quota information not available"
# Search for any existing key rotation patterns in the codebase
echo "Searching for existing key rotation patterns..."
rg -A 5 -B 5 "list_access_keys|create_access_key|update_access_key" --type pyLength of output: 32775
Guard against exceeding IAM’s two-key quota
Before calling create_access_key in scripts/aws_iam_secret_key_rotation_s3.py (function rotate_and_store_credentials, around lines 90–96):
- Verify the count of
existing = iam.list_access_keys(UserName=user_name)["AccessKeyMetadata"].
Iflen(existing) ≥ 2, disable the oldest key first (or immediately handle the quota) before creating a new one. - Wrap the
iam.create_access_key(UserName=…)call in a try/catch to handle theLimitExceededexception (ClientErrorwith CodeLimitExceeded), disabling an old key and retrying or failing gracefully. - Extend tests in tests/test_secret_key_rotation_s3.py to simulate a
LimitExceedederror oncreate_access_keyand confirm the fallback logic.
🤖 Prompt for AI Agents
In scripts/aws_iam_secret_key_rotation_s3.py around lines 90 to 96, before
calling iam.create_access_key, check if the number of existing access keys for
the user is 2 or more. If so, disable the oldest key first to avoid exceeding
IAM's two-key limit. Also, wrap the create_access_key call in a try/catch block
to catch ClientError with Code 'LimitExceeded'; in that case, disable an old key
and retry or fail gracefully. Additionally, update
tests/test_secret_key_rotation_s3.py to mock a LimitExceeded error on
create_access_key and verify the fallback behavior.
| def rotate_and_store_credentials(user_name: str) -> str: | ||
| """Rotate keys for ``user_name`` and return pre-signed download URL.""" | ||
| iam = get_iam_client() | ||
|
|
||
| # List current keys | ||
| existing = iam.list_access_keys(UserName=user_name)["AccessKeyMetadata"] | ||
|
|
||
| # Create new key | ||
| response = iam.create_access_key(UserName=user_name) | ||
| new_key = response["AccessKey"] | ||
|
|
||
| # Disable old keys | ||
| for meta in existing: | ||
| if meta["AccessKeyId"] != new_key["AccessKeyId"]: | ||
| iam.update_access_key( | ||
| UserName=user_name, | ||
| AccessKeyId=meta["AccessKeyId"], | ||
| Status="Inactive", | ||
| ) | ||
|
|
||
| bucket_name = f"iam-creds-{uuid4().hex}" | ||
| create_secure_bucket(bucket_name) | ||
|
|
||
| object_key = f"{new_key['AccessKeyId']}.json" | ||
| store_credentials( | ||
| bucket_name, | ||
| object_key, | ||
| { | ||
| "UserName": user_name, | ||
| "AccessKeyId": new_key["AccessKeyId"], | ||
| "SecretAccessKey": new_key["SecretAccessKey"], | ||
| "CreateDate": new_key["CreateDate"].isoformat(), | ||
| }, | ||
| ) | ||
|
|
||
| url = generate_presigned_url(bucket_name, object_key) | ||
| _logger.info("Pre-signed URL generated: %s", url) | ||
| print(url) | ||
| return url | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add cleanup mechanism for failed operations.
The function creates resources (bucket, object) but doesn't handle cleanup if later operations fail. Consider adding rollback logic.
Consider adding cleanup logic for failed operations:
def rotate_and_store_credentials(user_name: str) -> str:
"""Rotate keys for ``user_name`` and return pre-signed download URL."""
iam = get_iam_client()
+ bucket_name = None
+ new_key = None
try:
# List current keys
existing = iam.list_access_keys(UserName=user_name)["AccessKeyMetadata"]
# Create new key
response = iam.create_access_key(UserName=user_name)
new_key = response["AccessKey"]
# Disable old keys
for meta in existing:
if meta["AccessKeyId"] != new_key["AccessKeyId"]:
iam.update_access_key(
UserName=user_name,
AccessKeyId=meta["AccessKeyId"],
Status="Inactive",
)
bucket_name = f"iam-creds-{uuid4().hex}"
create_secure_bucket(bucket_name)
object_key = f"{new_key['AccessKeyId']}.json"
store_credentials(
bucket_name,
object_key,
{
"UserName": user_name,
"AccessKeyId": new_key["AccessKeyId"],
"SecretAccessKey": new_key["SecretAccessKey"],
"CreateDate": new_key["CreateDate"].isoformat(),
},
)
url = generate_presigned_url(bucket_name, object_key)
_logger.info("Pre-signed URL generated: %s", url)
print(url)
return url
+ except Exception as e:
+ # Cleanup on failure
+ if bucket_name:
+ try:
+ s3 = get_s3_client()
+ s3.delete_bucket(Bucket=bucket_name)
+ except:
+ pass
+ if new_key:
+ try:
+ iam.delete_access_key(UserName=user_name, AccessKeyId=new_key["AccessKeyId"])
+ except:
+ pass
+ raise📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def rotate_and_store_credentials(user_name: str) -> str: | |
| """Rotate keys for ``user_name`` and return pre-signed download URL.""" | |
| iam = get_iam_client() | |
| # List current keys | |
| existing = iam.list_access_keys(UserName=user_name)["AccessKeyMetadata"] | |
| # Create new key | |
| response = iam.create_access_key(UserName=user_name) | |
| new_key = response["AccessKey"] | |
| # Disable old keys | |
| for meta in existing: | |
| if meta["AccessKeyId"] != new_key["AccessKeyId"]: | |
| iam.update_access_key( | |
| UserName=user_name, | |
| AccessKeyId=meta["AccessKeyId"], | |
| Status="Inactive", | |
| ) | |
| bucket_name = f"iam-creds-{uuid4().hex}" | |
| create_secure_bucket(bucket_name) | |
| object_key = f"{new_key['AccessKeyId']}.json" | |
| store_credentials( | |
| bucket_name, | |
| object_key, | |
| { | |
| "UserName": user_name, | |
| "AccessKeyId": new_key["AccessKeyId"], | |
| "SecretAccessKey": new_key["SecretAccessKey"], | |
| "CreateDate": new_key["CreateDate"].isoformat(), | |
| }, | |
| ) | |
| url = generate_presigned_url(bucket_name, object_key) | |
| _logger.info("Pre-signed URL generated: %s", url) | |
| print(url) | |
| return url | |
| def rotate_and_store_credentials(user_name: str) -> str: | |
| """Rotate keys for ``user_name`` and return pre-signed download URL.""" | |
| iam = get_iam_client() | |
| bucket_name = None | |
| new_key = None | |
| try: | |
| # List current keys | |
| existing = iam.list_access_keys(UserName=user_name)["AccessKeyMetadata"] | |
| # Create new key | |
| response = iam.create_access_key(UserName=user_name) | |
| new_key = response["AccessKey"] | |
| # Disable old keys | |
| for meta in existing: | |
| if meta["AccessKeyId"] != new_key["AccessKeyId"]: | |
| iam.update_access_key( | |
| UserName=user_name, | |
| AccessKeyId=meta["AccessKeyId"], | |
| Status="Inactive", | |
| ) | |
| bucket_name = f"iam-creds-{uuid4().hex}" | |
| create_secure_bucket(bucket_name) | |
| object_key = f"{new_key['AccessKeyId']}.json" | |
| store_credentials( | |
| bucket_name, | |
| object_key, | |
| { | |
| "UserName": user_name, | |
| "AccessKeyId": new_key["AccessKeyId"], | |
| "SecretAccessKey": new_key["SecretAccessKey"], | |
| "CreateDate": new_key["CreateDate"].isoformat(), | |
| }, | |
| ) | |
| url = generate_presigned_url(bucket_name, object_key) | |
| _logger.info("Pre-signed URL generated: %s", url) | |
| print(url) | |
| return url | |
| except Exception as e: | |
| # Cleanup on failure | |
| if bucket_name: | |
| try: | |
| s3 = get_s3_client() | |
| s3.delete_bucket(Bucket=bucket_name) | |
| except Exception: | |
| pass | |
| if new_key: | |
| try: | |
| iam.delete_access_key( | |
| UserName=user_name, AccessKeyId=new_key["AccessKeyId"] | |
| ) | |
| except Exception: | |
| pass | |
| raise |
🤖 Prompt for AI Agents
In scripts/aws_iam_secret_key_rotation_s3.py between lines 78 and 117, the
function creates an S3 bucket and stores credentials but lacks cleanup if
subsequent steps fail, potentially leaving unused resources. To fix this, wrap
the resource creation and storage steps in a try-except block, and in the except
clause, delete the created S3 bucket and any stored objects to rollback partial
changes. Ensure that any exceptions are properly handled or re-raised after
cleanup to avoid resource leaks.
| class TestRotateAndStoreCredentials(unittest.TestCase): | ||
| """Test credential rotation and S3 upload.""" | ||
|
|
||
| @patch("aws_iam_secret_key_rotation_s3.uuid4") | ||
| @patch("aws_iam_secret_key_rotation_s3.get_s3_client") | ||
| @patch("aws_iam_secret_key_rotation_s3.get_iam_client") | ||
| def test_rotate_and_store(self, mock_iam_client, mock_s3_client, mock_uuid): | ||
| iam = Mock() | ||
| s3 = Mock() | ||
| mock_iam_client.return_value = iam | ||
| mock_s3_client.return_value = s3 | ||
| mock_uuid.return_value.hex = "abc123" | ||
|
|
||
| iam.list_access_keys.return_value = { | ||
| "AccessKeyMetadata": [{"AccessKeyId": "OLD"}] | ||
| } | ||
| iam.create_access_key.return_value = { | ||
| "AccessKey": { | ||
| "AccessKeyId": "NEW", | ||
| "SecretAccessKey": "SECRET", | ||
| "CreateDate": datetime(2024, 1, 1), | ||
| } | ||
| } | ||
| s3.generate_presigned_url.return_value = "https://example.com/url" | ||
|
|
||
| url = rotate.rotate_and_store_credentials("test") | ||
|
|
||
| self.assertEqual(url, "https://example.com/url") | ||
| bucket = "iam-creds-abc123" | ||
| s3.create_bucket.assert_called_once_with(Bucket=bucket) | ||
| s3.put_public_access_block.assert_called_once() | ||
| s3.put_object.assert_called_once_with( | ||
| Bucket=bucket, | ||
| Key="NEW.json", | ||
| Body=ANY, | ||
| ServerSideEncryption="AES256", | ||
| ) | ||
| iam.update_access_key.assert_called_once_with( | ||
| UserName="test", AccessKeyId="OLD", Status="Inactive" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add tests for error conditions and edge cases.
The current test only covers the happy path. Consider adding tests for error conditions such as AWS API failures, user not found, or bucket creation failures.
Add these additional test methods to improve coverage:
def test_rotate_and_store_iam_error(self):
"""Test handling of IAM API errors."""
with patch("aws_iam_secret_key_rotation_s3.get_iam_client") as mock_iam:
mock_iam.return_value.list_access_keys.side_effect = ClientError(
{"Error": {"Code": "NoSuchEntity"}}, "ListAccessKeys"
)
with self.assertRaises(ClientError):
rotate.rotate_and_store_credentials("nonexistent_user")
def test_rotate_and_store_s3_error(self):
"""Test handling of S3 API errors."""
with patch("aws_iam_secret_key_rotation_s3.get_iam_client") as mock_iam, \
patch("aws_iam_secret_key_rotation_s3.get_s3_client") as mock_s3:
mock_iam.return_value.list_access_keys.return_value = {"AccessKeyMetadata": []}
mock_iam.return_value.create_access_key.return_value = {
"AccessKey": {"AccessKeyId": "NEW", "SecretAccessKey": "SECRET", "CreateDate": datetime(2024, 1, 1)}
}
mock_s3.return_value.create_bucket.side_effect = ClientError(
{"Error": {"Code": "BucketAlreadyExists"}}, "CreateBucket"
)
with self.assertRaises(ClientError):
rotate.rotate_and_store_credentials("test_user")🤖 Prompt for AI Agents
In tests/test_secret_key_rotation_s3.py around lines 17 to 56, the existing test
only covers the successful execution path of rotate_and_store_credentials. To
improve test coverage, add new test methods that simulate error conditions such
as IAM API failures (e.g., user not found) and S3 API failures (e.g., bucket
creation errors). Use patching to mock these error scenarios by setting
side_effect to ClientError exceptions on the relevant mocked AWS client methods,
and assert that the function raises these exceptions as expected.
Summary
Testing
black .flake8 scripts/ lambda/ tests/ --max-line-length=120 --ignore=E501,W503,E203mypy scripts/ lambda/ --ignore-missing-importsbandit -r scripts/ lambda/pytestpytest tests/integrationhttps://chatgpt.com/codex/tasks/task_e_6877e073f29083329f9bffea9fdd1d35
Summary by Sourcery
Introduce a new CLI utility to rotate IAM access keys and securely store the resulting credentials in S3, and add unit tests for this functionality.
New Features:
Summary by CodeRabbit