Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions scripts/aws_iam_secret_rotation_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Rotate IAM secret access keys and store the new credentials in S3.

This script creates a new access key for the specified IAM user, deactivates the
old key if provided, and uploads the new credentials to a newly created S3
bucket. The bucket blocks public access and uses AES-256 server-side encryption.
A pre-signed URL valid for 14 days is generated for secure retrieval.
"""

from __future__ import annotations

import argparse
import json
import logging
import time
from typing import Optional

import boto3
from botocore.exceptions import ClientError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

_iam_client = None
_s3_client = None


def get_iam_client():
"""Return a boto3 IAM client, creating it if needed."""
global _iam_client
if _iam_client is None:
_iam_client = boto3.client("iam")
return _iam_client


def get_s3_client():
"""Return a boto3 S3 client, creating it if needed."""
global _s3_client
if _s3_client is None:
_s3_client = boto3.client("s3")
return _s3_client


def create_secure_bucket(bucket_name: str) -> str:
"""Create an S3 bucket with public access blocked and AES-256 encryption."""
s3 = get_s3_client()
s3.create_bucket(Bucket=bucket_name)
s3.put_public_access_block(
Comment on lines +44 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): No region specified for S3 bucket creation may cause issues in some environments.

Consider allowing the region to be specified or explicitly setting it to avoid unintended defaults.

Suggested change
def create_secure_bucket(bucket_name: str) -> str:
"""Create an S3 bucket with public access blocked and AES-256 encryption."""
s3 = get_s3_client()
s3.create_bucket(Bucket=bucket_name)
s3.put_public_access_block(
def create_secure_bucket(bucket_name: str, region: str = "us-east-1") -> str:
"""
Create an S3 bucket with public access blocked and AES-256 encryption.
Args:
bucket_name: Name of the bucket to create.
region: AWS region to create the bucket in. Defaults to 'us-east-1'.
"""
s3 = get_s3_client()
if region == "us-east-1":
s3.create_bucket(Bucket=bucket_name)
else:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": region},
)
s3.put_public_access_block(

Comment on lines +47 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: No handling for bucket name collisions or pre-existing buckets.

Handle the exception when create_bucket fails due to an existing bucket, and consider retrying with a new name or providing a clearer error message.

Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
"Rules": [
{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}
]
},
)
return bucket_name

Comment on lines +44 to +66
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling and region considerations.

The security configurations are excellent, but consider these improvements:

  1. Error handling: Bucket creation can fail due to name conflicts or permissions
  2. Region handling: Bucket creation in us-east-1 doesn't require CreateBucketConfiguration, but other regions do
  3. Bucket naming: Consider validating bucket name format
 def create_secure_bucket(bucket_name: str) -> str:
     """Create an S3 bucket with public access blocked and AES-256 encryption."""
     s3 = get_s3_client()
-    s3.create_bucket(Bucket=bucket_name)
+    try:
+        # Handle region-specific bucket creation
+        region = s3.meta.region_name
+        if region == 'us-east-1':
+            s3.create_bucket(Bucket=bucket_name)
+        else:
+            s3.create_bucket(
+                Bucket=bucket_name,
+                CreateBucketConfiguration={'LocationConstraint': region}
+            )
+    except ClientError as e:
+        if e.response['Error']['Code'] != 'BucketAlreadyOwnedByYou':
+            raise
+        logger.info("Bucket %s already exists and is owned by you", bucket_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def create_secure_bucket(bucket_name: str) -> str:
"""Create an S3 bucket with public access blocked and AES-256 encryption."""
s3 = get_s3_client()
s3.create_bucket(Bucket=bucket_name)
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
"Rules": [
{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}
]
},
)
return bucket_name
def create_secure_bucket(bucket_name: str) -> str:
"""Create an S3 bucket with public access blocked and AES-256 encryption."""
s3 = get_s3_client()
try:
# Handle region-specific bucket creation
region = s3.meta.region_name
if region == 'us-east-1':
s3.create_bucket(Bucket=bucket_name)
else:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
except ClientError as e:
if e.response['Error']['Code'] != 'BucketAlreadyOwnedByYou':
raise
logger.info("Bucket %s already exists and is owned by you", bucket_name)
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
"Rules": [
{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}
]
},
)
return bucket_name
🤖 Prompt for AI Agents
In scripts/aws_iam_secret_rotation_s3.py around lines 44 to 66, improve the
create_secure_bucket function by adding error handling to catch exceptions from
bucket creation failures such as name conflicts or permission issues. Modify the
bucket creation call to include CreateBucketConfiguration with the region
parameter for regions other than us-east-1. Also, add validation logic before
bucket creation to ensure the bucket name conforms to AWS naming rules,
returning or raising an error if invalid.


def rotate_and_store_credentials(
old_key_id: Optional[str] = None, user_name: Optional[str] = None
) -> str:
"""Rotate an IAM user's access key and store the new credentials in S3."""
iam = get_iam_client()
s3 = get_s3_client()

if user_name is None:
Comment on lines +68 to +75
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Potential risk of exceeding IAM access key quota.

If the user already has two active keys and the old key isn't deactivated, create_access_key will fail. Please check the number of existing keys and handle this case appropriately.

user_name = iam.get_user()["User"]["UserName"]

response = iam.create_access_key(UserName=user_name)
new_key = response["AccessKey"]

if old_key_id:
try:
iam.update_access_key(
UserName=user_name, AccessKeyId=old_key_id, Status="Inactive"
)
except ClientError as exc: # pragma: no cover - safety net
logger.error("Failed to deactivate old key: %s", exc)

bucket_name = create_secure_bucket(f"iam-key-backup-{int(time.time())}")
file_key = f"{new_key['AccessKeyId']}.json"
body = json.dumps(
{
"AccessKeyId": new_key["AccessKeyId"],
"SecretAccessKey": new_key["SecretAccessKey"],
}
)

s3.put_object(
Bucket=bucket_name,
Key=file_key,
Body=body,
ServerSideEncryption="AES256",
)

url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=14 * 24 * 60 * 60,
Comment on lines +105 to +108
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 suggestion (security): Long pre-signed URL expiration increases exposure risk.

A 14-day expiration is excessive for sensitive pre-signed URLs. Recommend reducing the duration or allowing it to be configurable to minimize risk.

Suggested implementation:

    url = s3.generate_presigned_url(
        "get_object",
        Params={"Bucket": bucket_name, "Key": file_key},
        ExpiresIn=presigned_url_expiration,
    )
    return url
def upload_and_generate_presigned_url(
    s3, bucket_name, file_key, body, presigned_url_expiration=3600
):
    """
    Uploads an object to S3 and generates a pre-signed URL.

    Args:
        s3: boto3 S3 client
        bucket_name: S3 bucket name
        file_key: S3 object key
        body: file content
        presigned_url_expiration: Expiration time in seconds (default: 1 hour)
    """
    s3.put_object(
        Bucket=bucket_name,
        Key=file_key,
        Body=body,
        ServerSideEncryption="AES256",
    )

    url = s3.generate_presigned_url(
        "get_object",
        Params={"Bucket": bucket_name, "Key": file_key},
        ExpiresIn=presigned_url_expiration,
    )
    return url

def main() -> None:
    parser = argparse.ArgumentParser(
        description="Rotate IAM credentials and store the secret in S3"
    )
    parser.add_argument(
        "--presigned-url-expiration",
        type=int,
        default=3600,
        help="Expiration (in seconds) for the pre-signed S3 URL (default: 3600 = 1 hour)",
    )
  • You will need to update the code that calls the S3 upload and pre-signed URL generation to use the new upload_and_generate_presigned_url function and pass the presigned_url_expiration argument from the CLI.
  • If the upload/generation logic is in a function not shown here, update its signature and calls accordingly.

)
Comment on lines +105 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)

return url

Comment on lines +68 to +111
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider encryption for stored credentials and improve error handling.

The core logic is sound and follows AWS best practices, but there are some considerations:

  1. Credential storage: Storing credentials in plain JSON, even in an encrypted bucket, may not meet security requirements
  2. Error handling: Limited error handling for S3 operations
  3. Resource cleanup: No cleanup mechanism if operations fail partway through

Consider encrypting the credentials before storage:

+import base64
+from cryptography.fernet import Fernet

 def rotate_and_store_credentials(
     old_key_id: Optional[str] = None, user_name: Optional[str] = None
 ) -> str:
     """Rotate an IAM user's access key and store the new credentials in S3."""
     iam = get_iam_client()
     s3 = get_s3_client()

     if user_name is None:
         user_name = iam.get_user()["User"]["UserName"]

-    response = iam.create_access_key(UserName=user_name)
-    new_key = response["AccessKey"]
+    try:
+        response = iam.create_access_key(UserName=user_name)
+        new_key = response["AccessKey"]
+    except ClientError as e:
+        logger.error("Failed to create new access key: %s", e)
+        raise

Also consider using AWS KMS for additional encryption or implementing client-side encryption for the JSON payload.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def rotate_and_store_credentials(
old_key_id: Optional[str] = None, user_name: Optional[str] = None
) -> str:
"""Rotate an IAM user's access key and store the new credentials in S3."""
iam = get_iam_client()
s3 = get_s3_client()
if user_name is None:
user_name = iam.get_user()["User"]["UserName"]
response = iam.create_access_key(UserName=user_name)
new_key = response["AccessKey"]
if old_key_id:
try:
iam.update_access_key(
UserName=user_name, AccessKeyId=old_key_id, Status="Inactive"
)
except ClientError as exc: # pragma: no cover - safety net
logger.error("Failed to deactivate old key: %s", exc)
bucket_name = create_secure_bucket(f"iam-key-backup-{int(time.time())}")
file_key = f"{new_key['AccessKeyId']}.json"
body = json.dumps(
{
"AccessKeyId": new_key["AccessKeyId"],
"SecretAccessKey": new_key["SecretAccessKey"],
}
)
s3.put_object(
Bucket=bucket_name,
Key=file_key,
Body=body,
ServerSideEncryption="AES256",
)
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=14 * 24 * 60 * 60,
)
return url
+import base64
+from cryptography.fernet import Fernet
def rotate_and_store_credentials(
old_key_id: Optional[str] = None, user_name: Optional[str] = None
) -> str:
"""Rotate an IAM user's access key and store the new credentials in S3."""
iam = get_iam_client()
s3 = get_s3_client()
if user_name is None:
user_name = iam.get_user()["User"]["UserName"]
- response = iam.create_access_key(UserName=user_name)
- new_key = response["AccessKey"]
+ try:
+ response = iam.create_access_key(UserName=user_name)
+ new_key = response["AccessKey"]
+ except ClientError as e:
+ logger.error("Failed to create new access key: %s", e)
+ raise
if old_key_id:
try:
iam.update_access_key(
UserName=user_name, AccessKeyId=old_key_id, Status="Inactive"
)
except ClientError as exc: # pragma: no cover - safety net
logger.error("Failed to deactivate old key: %s", exc)
bucket_name = create_secure_bucket(f"iam-key-backup-{int(time.time())}")
file_key = f"{new_key['AccessKeyId']}.json"
body = json.dumps(
{
"AccessKeyId": new_key["AccessKeyId"],
"SecretAccessKey": new_key["SecretAccessKey"],
}
)
s3.put_object(
Bucket=bucket_name,
Key=file_key,
Body=body,
ServerSideEncryption="AES256",
)
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=14 * 24 * 60 * 60,
)
return url
🤖 Prompt for AI Agents
In scripts/aws_iam_secret_rotation_s3.py between lines 68 and 111, the
credentials are stored as plain JSON in S3 with only server-side encryption,
which may not meet security standards, and there is minimal error handling for
S3 operations and no cleanup on failure. To fix this, implement client-side
encryption of the JSON payload before uploading to S3, for example by using AWS
KMS to encrypt the credentials locally, then store the encrypted data.
Additionally, add try-except blocks around S3 put_object and presigned URL
generation to handle errors gracefully, and ensure any created resources like
buckets or keys are cleaned up if subsequent steps fail to avoid orphaned
resources.


def main() -> None:
"""CLI entry point."""
parser = argparse.ArgumentParser(
description="Rotate IAM credentials and store the secret in S3"
)
parser.add_argument(
"--old-key-id", help="Existing access key ID to deactivate after rotation"
)
parser.add_argument("--user-name", help="IAM username (defaults to current user)")
args = parser.parse_args()

url = rotate_and_store_credentials(args.old_key_id, args.user_name)
print(f"Pre-signed URL (valid for 14 days): {url}")


if __name__ == "__main__":
main()
49 changes: 49 additions & 0 deletions tests/test_secret_rotation_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""Unit tests for secret rotation S3 storage script."""

import unittest
from unittest.mock import Mock, patch

# Add scripts directory to path
import os
import sys

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))

import aws_iam_secret_rotation_s3 as rot # noqa: E402


class TestRotateAndStore(unittest.TestCase):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding test coverage for the CLI entry point (main function).

Add a test that invokes the CLI entry point, either by running the script as a subprocess or by mocking argparse, to verify correct argument parsing and output.

Suggested implementation:

import aws_iam_secret_rotation_s3 as rot  # noqa: E402
import unittest
from unittest.mock import Mock, patch
import io
import aws_iam_secret_rotation_s3 as rot  # noqa: E402


class TestCLIEntryPoint(unittest.TestCase):
    """Test CLI entry point for aws_iam_secret_rotation_s3."""

    @patch("sys.argv", ["aws_iam_secret_rotation_s3.py", "--help"])
    @patch("sys.stdout", new_callable=io.StringIO)
    def test_main_help(self, mock_stdout):
        # Import inside test to ensure patches are in effect
        with self.assertRaises(SystemExit) as cm:
            import importlib
            importlib.reload(rot)
            rot.main()
        output = mock_stdout.getvalue()
        self.assertIn("usage", output.lower())
        self.assertEqual(cm.exception.code, 0)

    @patch("sys.argv", ["aws_iam_secret_rotation_s3.py", "--some-required-arg", "value"])
    @patch("aws_iam_secret_rotation_s3.rotate_and_store_credentials")
    @patch("sys.stdout", new_callable=io.StringIO)
    def test_main_invocation(self, mock_stdout, mock_rotate,):
        # Simulate successful rotation
        mock_rotate.return_value = True
        import importlib
        importlib.reload(rot)
        rot.main()
        output = mock_stdout.getvalue()
        self.assertTrue(mock_rotate.called)
        # Optionally check for expected output
        # self.assertIn("Success", output)
  • Adjust the CLI argument names in the test (e.g., --some-required-arg) to match the actual required arguments of your CLI.
  • If your CLI prints to stderr, patch sys.stderr instead of sys.stdout.
  • If the main function is not named main, update the test to call the correct entry point.
  • If the CLI is structured differently (e.g., uses if __name__ == "__main__":), you may need to import and call the script differently.

"""Test rotate_and_store_credentials function."""

@patch("aws_iam_secret_rotation_s3.create_secure_bucket")
@patch("aws_iam_secret_rotation_s3.get_s3_client")
@patch("aws_iam_secret_rotation_s3.get_iam_client")
def test_rotate_and_store_success(self, mock_get_iam, mock_get_s3, mock_bucket):
iam_client = Mock()
s3_client = Mock()
mock_get_iam.return_value = iam_client
mock_get_s3.return_value = s3_client
mock_bucket.return_value = "test-bucket"

iam_client.get_user.return_value = {"User": {"UserName": "test"}}
iam_client.create_access_key.return_value = {
"AccessKey": {
"AccessKeyId": "AKIATEST",
"SecretAccessKey": "secret",
}
}
s3_client.generate_presigned_url.return_value = "https://example.com/url"

url = rot.rotate_and_store_credentials("OLDKEY")

iam_client.update_access_key.assert_called_once_with(
UserName="test", AccessKeyId="OLDKEY", Status="Inactive"
)
s3_client.put_object.assert_called_once()
mock_bucket.assert_called_once()
self.assertEqual(url, "https://example.com/url")

Comment on lines +16 to +46
Copy link

@coderabbitai coderabbitai bot Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Expand test coverage for edge cases and error scenarios.

The current test only covers the happy path. Consider adding tests for:

  1. Error scenarios: IAM/S3 client failures
  2. Edge cases: No old key provided, explicit username
  3. Validation: Proper JSON structure in S3 object

Add these test methods:

def test_rotate_without_old_key(self, mock_get_iam, mock_get_s3, mock_bucket):
    # Test rotation without deactivating old key
    pass

def test_rotate_with_explicit_username(self, mock_get_iam, mock_get_s3, mock_bucket):
    # Test with provided username instead of getting current user
    pass

def test_rotate_iam_error(self, mock_get_iam, mock_get_s3, mock_bucket):
    # Test IAM client error handling
    pass
🤖 Prompt for AI Agents
In tests/test_secret_rotation_s3.py around lines 16 to 46, the existing test
only covers the successful rotation scenario. To improve coverage, add new test
methods to handle edge cases and error scenarios: one to test rotation without
providing an old key (ensuring no deactivation call is made), one to test
rotation with an explicit username parameter (mocking get_user accordingly), and
one to simulate IAM client errors (mocking methods to raise exceptions and
asserting proper error handling). Also, add assertions to validate the JSON
structure of the object stored in S3 in these tests.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find all GitHub Actions workflow files
echo "=== GitHub Actions workflow files ==="
fd -e yml -e yaml .github/workflows

echo -e "\n=== Searching for actions/upload-artifact usage ==="
rg "actions/upload-artifact" .github/workflows/ -A 2 -B 2

echo -e "\n=== Looking for specific v3 usage ==="
rg "actions/upload-artifact@v3" .github/workflows/

Length of output: 1128


@bspeagle The job failure is caused by using a deprecated version of the actions/upload-artifact action in your GitHub Actions workflow.

The Issue:
Your .github/workflows/test.yml file is using actions/upload-artifact@v3, which GitHub deprecated in April 2024. This version will stop working, causing workflow failures.

The Fix:
Update the action from v3 to v4 in your workflow file. Here's what you need to change:

# Current (deprecated):
- name: Upload security scan results
  uses: actions/upload-artifact@v3
  if: always()
  with:
    # ... your configuration

# Updated (recommended):
- name: Upload security scan results
  uses: actions/upload-artifact@v4
  if: always()
  with:
    # ... your configuration

Key Changes in v4:

  • Improved performance and reliability
  • Better artifact management
  • Enhanced security features

Simply update the version number from @v3 to @v4 in your workflow file, and the job should run successfully again.


🏁 Script executed:

#!/bin/bash
# Find all workflow files correctly
echo "=== GitHub Actions workflow files ==="
fd . .github/workflows

echo -e "\n=== Full content of test.yml ==="
cat .github/workflows/test.yml

echo -e "\n=== Context around the upload-artifact usage ==="
rg "upload-artifact" .github/workflows/test.yml -A 5 -B 5

Length of output: 5985


@bspeagle The job failure is caused by using the deprecated actions/upload-artifact@v3 action in your GitHub Actions workflow.

The Issue:
In your .github/workflows/test.yml file, line 120 uses the deprecated version:

- name: Upload security scan results
  uses: actions/upload-artifact@v3  # ← This is deprecated
  if: always()
  with:
    name: security-scan-results
    path: bandit-report.json

The Fix:
Simply update line 120 to use v4:

- name: Upload security scan results
  uses: actions/upload-artifact@v4  # ← Change v3 to v4
  if: always()
  with:
    name: security-scan-results
    path: bandit-report.json

What This Does:
This step uploads your Bandit security scan results as an artifact so you can download and review them from the GitHub Actions run page, even if the scan finds issues (thanks to if: always()).

The v4 action provides the same functionality with improved performance and security. No other changes to your workflow configuration are needed.


🧠 Learnings used
Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_self_service_key_rotation.py : Scripts follow AWS best practices for key rotation.

Learnt from: CR
PR: tblakex01/iam-key-rotation#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-02T14:07:15.163Z
Learning: Applies to scripts/aws_iam_{self_service,user}_password_reset.py : Password generation uses Python's `secrets` module for cryptographic security.


if __name__ == "__main__":
unittest.main()
Loading