-
Notifications
You must be signed in to change notification settings - Fork 9
feat: Add CLPRemoteHandler to handle Multipart Upload to AWS S3 bucket #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ose for CLPFileHandler usage
WalkthroughThe changes introduce a new handler class, Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant H as CLPS3Handler
participant B as Local Buffer
participant S as S3 Bucket
C->>H: Sends log message
H->>B: Write encoded message to buffer
alt Buffer size below threshold
B-->>H: Continue buffering
else Buffer reaches threshold
H->>B: Trigger flush
H->>S: Upload log part (multipart upload)
S-->>H: Upload acknowledgement
end
C->>H: Closes logging handler
H->>S: Finalise multipart upload
S-->>H: Completion confirmation
H-->>C: Handler closed
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
src/clp_logging/remote_handlers.py (5)
25-27
: Consider storing S3 resource and client in a shared session.
Using a sharedboto3.Session
can help maintain consistent configuration and mitigate unexpected credential issues, particularly in multi-threaded or multi-process scenarios.
34-38
: Allow for a customisable chunk size.
This static 5MB chunk is the minimum S3 part size, which is valid but may not be optimal for all use cases. Consider allowing configuration so that users can choose suitable part sizes, which may improve throughput or performance for larger/smaller logs in different environments.
49-65
: Consider adding a prefix for remote log names.
The_remote_log_naming
method currently relies solely on a timestamp and the file count. Adding a static or configurable prefix (e.g.application_name/
) can help organize logs by application, service, or environment.
115-117
: Synchronize concurrent upload attempts.
initiate_upload
raises an exception if an upload is already in progress, but concurrency control must also be enforced if multiple threads or processes attempt uploading. Consider a thread lock or other concurrency mechanism to ensure safe usage.
190-230
: Enhance error handling for failed multipart completion.
Ifcomplete_multipart_upload
fails on the final step, the code may orphan the partially uploaded segments. You might want to implement a retry or a fallback mechanism to handle this scenario, avoiding silent data loss.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/clp_logging/remote_handlers.py
(1 hunks)
🔇 Additional comments (2)
src/clp_logging/remote_handlers.py (2)
138-181
: Validate logic for extremely large log files.
The code restarts the multipart upload when 10,000 parts are reached. Ensure that re-initializing the upload seamlessly works for logs well beyond this boundary. Large file handling may require further testing, especially under high throughput or multi-day runs.
243-246
: Ensure graceful order of operations when closing.
super().close()
is invoked prior to finalising the multipart upload. If the superclass’s close logic throws an exception, the upload might remain incomplete. Consider a try-except flow or reversing the order to complete the upload first in certain scenarios if that is safer for your use case.
src/clp_logging/remote_handlers.py
Outdated
def _upload_part(self) -> Dict[str, int | str]: | ||
if self.log_path is None: | ||
raise ValueError("No input file.") | ||
|
||
upload_data: bytes | ||
# Read the latest version of the file | ||
try: | ||
with open(self.log_path, "rb") as file: | ||
file.seek(self.multipart_upload_config["pos"]) | ||
upload_data = file.read(self.multipart_upload_config["size"]) | ||
except FileNotFoundError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle empty or incomplete data reads more gracefully.
If the file has no new data, _upload_part
may attempt to upload an empty byte array, which could be wasteful or unneeded. Ensure that logic checks for empty data before proceeding with the upload.
src/clp_logging/remote_handlers.py
Outdated
class CLPRemoteHandler(CLPFileHandler): | ||
""" | ||
Handles CLP file upload and comparison to AWS S3 bucket. Configuration of | ||
AWS access key is required. Run command `aws configure`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configuration of AWS access key is required. Run command
aws configure
.
If I'm not wrong the credentials are stored in ~/.aws
by aws configure
. Let's try to make the credentials directly configurable in our handler's configuration (probably via some __init__
parameter just like how the HTTPHandler
requires a credentials parameter). Only when such credentials are not provided, we fallback to credentials in ~/.aws
.
For test automation, we can look into setting up an S3 server with https://github.com/minio/minio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments in terms of high-level design problems.
I do have more comments about the coding details but let's fix the design issues first.
Please try to fix the linter and ensure the GitHub workflows passed, thx!
Feel free to ask questions offline
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
src/clp_logging/handlers.py (5)
801-807
: Docstring is concise
The docstring briefly describes the class purpose. If applicable, consider expanding it to clarify usage constraints, such as AWS credentials requirements and minimum AWS permissions.
808-819
: Consider error handling during initial stream write
While this method correctly initializes a Zstandard compressor and writes a preamble, consider wrapping the initial write operation in a try-except to handle potential stream write errors gracefully.
854-866
: Zero-padding for logs path
Currently, the month/day folder path omits leading zeros. Although functional, consider a zero-padded folder structure to maintain consistent sorting (e.g., ‘2025/02/04’) and avoid potential confusion.
868-879
: Avoid forcing a flush on every write
Calling _flush() for each log event can degrade performance if logging is frequent. Consider selectively flushing only when the buffer reaches the threshold or on a fixed interval.Example of a conditionally triggered flush:
def _write(self, loglevel: int, msg: str) -> None: if self.closed: raise RuntimeError("Stream already closed") clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms) self.ostream.write(clp_msg) - self._flush() + if self.local_buffer.tell() >= self.buffer_size: + self._flush() if self.local_buffer.tell() >= self.buffer_size: ...
918-951
: Remove or guard print(self.uploaded_parts)
Using direct print in production code is often left over from debugging or can clutter logs. If you need this output, consider gating it behind a debug flag or using a logger.- print(self.uploaded_parts) + # If debug printing is needed: + # logger.debug(f"Uploaded parts: {self.uploaded_parts}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/clp_logging/handlers.py
(2 hunks)
🔇 Additional comments (4)
src/clp_logging/handlers.py (4)
13-13
: Imports look appropriate
These newly added imports (typing, datetime, io, boto3) appear to be used by the new class and methods. No issues observed.Also applies to: 19-21
799-800
: No business logic or content
These lines do not appear to contain meaningful functionality and can be safely ignored.
820-852
: Validate multipart upload creation result
This constructor calls create_multipart_upload but does not handle the scenario where “UploadId” might be missing or the call itself might fail. It might be worthwhile to wrap this in a try-except—to manage errors proactively—and log user-friendly error messages or abort cleanly.Do you want me to craft a verification script to search the codebase for usage of “create_multipart_upload” and check for robust error handling?
880-915
: Repeated partial part overwrites
The same part number is reused until the buffer surpasses the 5MB threshold. This repeatedly overwrites part data. If this is intended behaviour, it is acceptable, but it may lead to unneeded S3 traffic. Confirm this logic meets your expectations for how partial chunks are handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (9)
src/clp_logging/handlers.py (9)
837-841
: Consider clarifying the stream handling logic.The current implementation sets
self.ostream
to the input stream but then usesself.local_buffer
in theinit
method. This could lead to confusion when maintaining the code.- self.ostream: IO[bytes] = stream + self.stream: IO[bytes] = stream + # ostream will be initialized in init() method with appropriate stream
856-858
: Remove commented code or implement checksum validation.The commented-out
ChecksumAlgorithm
parameter suggests incomplete implementation. Either implement the checksum validation or remove the commented code for cleaner implementation.- create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256" - ) + create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self.obj_key + )
909-916
: Remove commented checksum code.The commented-out checksum calculation and validation code should either be implemented or removed for clarity and maintainability.
- # sha256_checksum: str = self._calculate_part_sha256(upload_data) response: Dict[str, Any] = self.s3_client.upload_part( Bucket=self.s3_bucket, Key=self.obj_key, Body=data, PartNumber=self.upload_index, UploadId=self.upload_id, - # ChecksumSHA256=sha256_checksum, )
919-923
: Remove commented checksum code from upload_status dictionary.Similar to the previous comment, remove the commented checksum code for clarity.
upload_status: Dict[str, int | str] = { "PartNumber": self.upload_index, "ETag": response["ETag"], - # "ChecksumSHA256": response["ChecksumSHA256"], }
926-929
: Simplify the upload status tracking logic.The current implementation is unnecessarily complex. Since
upload_index
tracks the current part number, you can directly append to the list.- # Determine the part to which the new upload_status belongs - if len(self.uploaded_parts) > self.upload_index - 1: - self.uploaded_parts[self.upload_index-1] = upload_status - else: - self.uploaded_parts.append(upload_status) + # Append the upload status to our list + self.uploaded_parts.append(upload_status)
958-958
: Remove commented checksum validation.Remove the commented-out checksum validation code for consistency and clarity.
"PartNumber": part["PartNumber"], "ETag": part["ETag"], - # "ChecksumSHA256": part["ChecksumSHA256"],
853-853
: Consider making the buffer size configurable.The buffer size is currently hardcoded to 5MB. Consider making this configurable through the constructor to allow flexibility for different use cases.
- self.buffer_size: int = 1024 * 1024 * 5 + # Default buffer size: 5MB (S3 minimum for multipart upload except last part) + self.buffer_size: int = buffer_size if buffer_size is not None else 1024 * 1024 * 5And update the constructor:
def __init__( self, s3_bucket: str, stream: Optional[IO[bytes]] = None, enable_compression: bool = True, timestamp_format: Optional[str] = None, timezone: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, buffer_size: Optional[int] = None ) -> None:
873-876
: Consider making the file extensions configurable.The comment "HARD-CODED TO .clp.zst FOR NOW" suggests this is not the final implementation. Consider making file extensions configurable.
- # HARD-CODED TO .clp.zst FOR NOW - if self.enable_compression: - new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp.zst" - else: - new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp" + # Use configured extensions or defaults + extension = self.file_extension if hasattr(self, 'file_extension') else ".clp" + compression_ext = self.compression_extension if hasattr(self, 'compression_extension') else ".zst" + + if self.enable_compression: + new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}{extension}{compression_ext}" + else: + new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}{extension}"And add to the constructor:
self.file_extension: str = file_extension if file_extension is not None else ".clp" self.compression_extension: str = compression_extension if compression_extension is not None else ".zst"
849-852
: Consider adding AWS region parameter.AWS S3 operations are region-specific, but the region parameter is missing from the client initialization. Consider adding a region parameter to the constructor.
self.s3_client: boto3.client if aws_access_key_id and aws_secret_access_key: - self.s3_client = boto3.client("s3", aws_access_key_id, aws_secret_access_key) + self.s3_client = boto3.client("s3", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region) else: - self.s3_client = boto3.client("s3") + self.s3_client = boto3.client("s3", region_name=region)And update the constructor:
def __init__( self, s3_bucket: str, stream: Optional[IO[bytes]] = None, enable_compression: bool = True, timestamp_format: Optional[str] = None, timezone: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, region: Optional[str] = None ) -> None:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/clp_logging/handlers.py
(2 hunks)
🔇 Additional comments (2)
src/clp_logging/handlers.py (2)
13-13
: Updated typing imports properly.The typing import line has been updated to include additional types needed for the new S3 handler functionality.
19-21
: LGTM - Required imports added.The new imports for
datetime
,io
, andboto3
are necessary for the S3 handler implementation. The boto3 library is correctly imported for AWS S3 integration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
src/clp_logging/handlers.py (1)
893-921
: 🛠️ Refactor suggestionOptimize
_flush()
calls to reduce API operations.Calling
_flush()
on every write operation is inefficient as it'll make an API call to S3 each time, even if the buffer is nearly empty. Consider flushing only when necessary.- self._flush() + # Only flush if we have enough data to make it worthwhile + if self.local_buffer.tell() >= self.buffer_size: + self._flush() + self.upload_index += 1 + self.local_buffer.seek(0) + self.local_buffer.truncate(0) - if self.local_buffer.tell() >= self.buffer_size: - # Rotate after 10000 parts (limitaion by s3) - if self.upload_index >= self.part_limit: + # Rotate after 10000 parts (limitation by S3) + if self.upload_index >= self.part_limit:
🧹 Nitpick comments (2)
src/clp_logging/handlers.py (2)
805-811
: Good class docstring, but could be more detailed.The class docstring clearly states the purpose of the handler. Consider expanding it to include information about multipart upload functionality, configuration requirements, and usage examples.
class CLPS3Handler(CLPBaseHandler): """ - Log is written to stream in CLP IR encoding, and uploaded to s3_bucket + Log is written to stream in CLP IR encoding, and uploaded to an AWS S3 bucket + using multipart upload. The handler manages buffer rotation and completes + the upload when closed or when part limits are reached. :param s3_bucket: S3 bucket to upload CLP encoded log messages to + :param stream: Optional output stream, defaults to an internal buffer + :param enable_compression: Whether to compress the log data using Zstandard + :param timestamp_format: Format for timestamps in the log + :param timezone: Timezone for log timestamps + :param aws_access_key_id: Optional AWS access key ID + :param aws_secret_access_key: Optional AWS secret access key + :param part_limit: Maximum number of parts for multipart upload (defaults to 10,000) """
949-952
: Simplify the uploaded_parts update logic.The current logic for determining where to place the upload status in the array can be simplified.
- # Determine the part to which the new upload_status belongs - if len(self.uploaded_parts) > self.upload_index - 1: - self.uploaded_parts[self.upload_index-1] = upload_status - else: - self.uploaded_parts.append(upload_status) + # Update or append the upload status + while len(self.uploaded_parts) < self.upload_index: + self.uploaded_parts.append({}) + self.uploaded_parts[self.upload_index-1] = upload_status
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/clp_logging/handlers.py
(2 hunks)
🔇 Additional comments (9)
src/clp_logging/handlers.py (9)
13-13
: Include Union for proper type hint support.Good update to the type imports to include the Union type, which is required for the new class.
19-24
: Appropriate imports for S3 functionality.All the necessary imports for S3 integration have been added: boto3 for AWS SDK, io for buffer handling, base64/hashlib for checksums, and datetime for naming.
812-823
: Proper initialization of compression context.The
init
method correctly sets up the compression context and writes the preamble to the output stream.
847-873
: Good error handling for AWS client initialization.The code properly checks for AWS credentials and handles initialization errors appropriately.
875-891
: Well-structured remote log naming.The
_remote_log_naming
method creates a logical folder structure based on date hierarchy and includes a timestamp in the filename. The method also handles file rotation by appending a counter when needed.
909-916
: Correct error handling for rotation case.Good error handling for the rotation case, checking if the upload ID is valid after creating a new multipart upload.
923-961
: Strong data integrity validation with checksums.The
_flush
method includes excellent checksum validation using SHA256, ensuring data integrity. The method also properly handles errors by aborting the multipart upload in case of exceptions.
962-991
: Properly finalize multipart upload with appropriate error handling.The
_complete_upload
method correctly writes the EOF character, flushes the buffer, and completes the multipart upload with all the necessary part information. The method also handles errors by aborting the upload in case of exceptions.
993-997
: Good resource cleanup in close method.The
close
method properly completes the upload and closes all resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
src/clp_logging/handlers.py (5)
1125-1125
: Line exceeds maximum length.This line exceeds the project's maximum line length of 100 characters.
- self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" + self._remote_folder_path: str = ( + f"{self.s3_directory}{self._start_timestamp.year}/" + f"{self._start_timestamp.month}/{self._start_timestamp.day}" + )🧰 Tools
🪛 Ruff (0.8.2)
1125-1125: Line too long (148 > 100)
(E501)
1122-1137
: Consider making timestamp format configurable.The timestamp format in the remote file naming is hardcoded. Consider making this configurable to allow users to define their own file naming conventions.
- upload_time: str = str(int(self._start_timestamp.timestamp())) + # Use configurable timestamp format for file naming + upload_time: str = self._start_timestamp.strftime(self._file_timestamp_format) if hasattr(self, '_file_timestamp_format') else str(int(self._start_timestamp.timestamp()))You could add a new parameter to the constructor:
file_timestamp_format: Optional[str] = None, # Format for timestamp in filenames🧰 Tools
🪛 Ruff (0.8.2)
1125-1125: Line too long (148 > 100)
(E501)
1154-1155
: Consider optimizing flush operations.The code currently writes the EOF character and then immediately flushes for non-multipart uploads. Consider writing both operations in a single block to reduce overhead.
- if not self.use_multipart_upload: - self._ostream.write(EOF_CHAR) - self._flush() + if not self.use_multipart_upload: + self._ostream.write(EOF_CHAR) + self._flush() + else: + self._flush()
1056-1057
: Use underscore prefix for private instance variables.According to Python conventions and as suggested in previous comments, instance variables that are meant to be private should be prefixed with an underscore.
- self.closed: bool = False - self.enable_compression: bool = enable_compression - self._local_buffer: io.BytesIO = io.BytesIO() + self._closed: bool = False + self._enable_compression: bool = enable_compression + self._local_buffer: io.BytesIO = io.BytesIO()This should be applied consistently throughout the class.
1157-1178
: Extract the rotation logic to a separate method.The rotation logic in the
_write
method is complex and could be extracted to a separate method for better readability and maintainability.- if self.use_multipart_upload and self._local_buffer.tell() >= self.upload_part_size: - # Rotate after maximum number of parts - if self._upload_index >= self.max_part_num: - self._complete_multipart_upload() - self._ostream.close() - self._local_buffer = io.BytesIO() - self._init_stream(self._local_buffer) - self._remote_file_count += 1 - self._obj_key = self._remote_log_naming() - self._uploaded_parts = [] - self._upload_index = 1 - create_ret = self._s3_client.create_multipart_upload( - Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" - ) - self._upload_id = create_ret["UploadId"] - if not self._upload_id: - raise RuntimeError("Failed to initialize new upload ID.") - else: - self._upload_index += 1 - self._local_buffer.seek(0) - self._local_buffer.truncate(0) + if self.use_multipart_upload and self._local_buffer.tell() >= self.upload_part_size: + if self._upload_index >= self.max_part_num: + self._rotate_log_file() + else: + self._upload_index += 1 + self._local_buffer.seek(0) + self._local_buffer.truncate(0)And add a new method:
def _rotate_log_file(self) -> None: """ Rotate to a new log file when the current one has reached maximum part count. """ self._complete_multipart_upload() self._ostream.close() self._local_buffer = io.BytesIO() self._init_stream(self._local_buffer) self._remote_file_count += 1 self._obj_key = self._remote_log_naming() self._uploaded_parts = [] self._upload_index = 1 create_ret = self._s3_client.create_multipart_upload( Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" ) self._upload_id = create_ret["UploadId"] if not self._upload_id: raise RuntimeError("Failed to initialize new upload ID.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pyproject.toml
(1 hunks)src/clp_logging/handlers.py
(4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/clp_logging/handlers.py (1)
src/clp_logging/auto_generated_kv_pairs_utils.py (1)
AutoGeneratedKeyValuePairsBuffer
(19-68)
🪛 Ruff (0.8.2)
src/clp_logging/handlers.py
1125-1125: Line too long (148 > 100)
(E501)
1202-1202: Line too long (105 > 100)
(E501)
1240-1240: f-string without any placeholders
Remove extraneous f
prefix
(F541)
🔇 Additional comments (5)
pyproject.toml (2)
30-31
: AWS SDK dependencies look appropriate for the new S3 handler.The addition of boto3 and botocore with specific minimum versions provides the necessary AWS SDK support for the S3 handler functionality.
39-39
: Good addition of moto for testing.Adding the moto library is appropriate for testing the AWS S3 integration. It will allow for mocking S3 services in unit tests without requiring actual AWS credentials or resources.
src/clp_logging/handlers.py (3)
23-29
: Appropriate imports for AWS S3 integration.The added imports provide the necessary functionality for AWS S3 integration, hashing, and buffer management.
51-55
: Good use of constants for AWS S3 limits.Using named constants for AWS S3 multipart upload limits improves readability and maintainability. This makes it clear what these values represent and why they're being used.
1017-1038
: Well-documented class with clear parameter descriptions.The class docstring clearly explains the purpose and parameters of the S3 handler. The parameter descriptions are comprehensive and help users understand how to configure the handler properly.
def close(self) -> None: | ||
""" | ||
Complete the upload if needed. Close the stream and the handler. | ||
""" | ||
if self.use_multipart_upload: | ||
self._complete_multipart_upload() | ||
self._ostream.close() | ||
self.closed = True | ||
super().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Add unit tests for the new S3 handler.
There are no unit tests for this new handler class. Consider adding comprehensive tests to verify its functionality, especially for the S3 integration.
🌐 Web query:
How to mock AWS S3 services for unit testing in Python?
💡 Result:
To mock AWS S3 services for Python unit testing, three robust approaches are commonly used, each with distinct advantages and use cases:
1. Moto Library (Simple Mocking)
Moto provides high-level AWS service mocking with minimal setup.
Example using pytest fixture [1][4][9]:
from moto import mock_aws
import boto3
import pytest
@mock_aws # Mocks all AWS services by default
def test_s3_upload():
s3 = boto3.client("s3")
s3.create_bucket(Bucket="test-bucket")
# Test S3 operations
s3.put_object(Bucket="test-bucket", Key="test.txt", Body="data")
obj = s3.get_object(Bucket="test-bucket", Key="test.txt")
assert obj["Body"].read().decode() == "data"
Pros:
- Simple decorator/context manager syntax
- Maintains state between operations (e.g., buckets persist across calls)
- Supports 130+ AWS services beyond S3 [4][9]
2. Botocore Stubs (Granular Control)
For precise control over API responses using AWS's native testing tools [1][6]:
from botocore.stub import Stubber
import boto3
def test_s3_download():
s3 = boto3.client("s3")
stubber = Stubber(s3)
# Program expected response
stubber.add_response(
"get_object",
{"Body": StreamingBody(BytesIO(b"test"), len(b"test"))},
{"Bucket": "test-bucket", "Key": "test.txt"}
)
with stubber:
response = s3.get_object(Bucket="test-bucket", Key="test.txt")
assert response["Body"].read() == b"test"
Pros:
- Exact request/response validation
- No external dependencies
- Ideal for testing error handling and edge cases
3. LocalStack (Realistic Environment)
For integration-style testing with a full local S3 implementation [1][10]:
# docker-compose.yml
services:
localstack:
image: localstack/localstack
ports:
- "4566:4566" # S3 endpoint
environment:
- SERVICES=s3
Test setup:
import boto3
def test_with_localstack():
s3 = boto3.client(
"s3",
endpoint_url="http://localhost:4566",
aws_access_key_id="test",
aws_secret_access_key="test"
)
s3.create_bucket(Bucket="test-bucket")
# Regular S3 operations...
Pros:
- 100% API-compatible S3 implementation
- Supports advanced features like versioning and lifecycle rules
- Can test multi-service workflows [3][9]
Key Considerations
Factor | Moto | Botocore | LocalStack |
---|---|---|---|
Speed | ⚡️ Fastest | ⚡️ Fast | 🐢 Slowest |
Realism | Medium | Low | High |
Network Dependency | None | None | Required |
Setup Complexity | Low | Medium | High |
Best Practices [7][9]:
- Use Moto for most unit tests
- Combine with
pytest.mark.parametrize
for data-driven tests - Employ LocalStack for pre-deployment integration testing
- Disable network access during tests using
pytest-socket
For full test examples and configuration details, see the Moto documentation and LocalStack guides.
Citations:
- 1: https://www.sanjaysiddhanti.com/2020/04/08/s3testing/
- 2: https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/run-unit-tests-for-python-etl-jobs-in-aws-glue-using-the-pytest-framework.html
- 3: https://caylent.com/blog/mocking-aws-calls-using-moto
- 4: https://github.com/getmoto/moto
- 5: https://gist.github.com/Bakhtiyar-Garashov/ad97ed0de06077ea91fbf4e9219eb077
- 6: https://dev.to/panilo/boto3-and-python-unittestmock-5hji
- 7: https://aws.amazon.com/blogs/devops/unit-testing-aws-lambda-with-python-and-mock-aws-services/
- 8: https://gist.github.com/realFranco/7c3110c4efd2660f21d887b3b42b94a9
- 9: https://blog.newmathdata.com/advanced-unit-testing-in-aws-a666e787aa99
- 10: https://www.youtube.com/watch?v=NBICMF0i4Ok
- 11: https://dev.to/d4vsanchez/mocking-and-patching-in-python-for-unit-testing-3j08
- 12: https://datawookie.dev/blog/2023/08/mocking-s3-in-python-tests/
Action Required: Add Comprehensive Unit Tests for the S3 Handler
The implementation in src/clp_logging/handlers.py
(lines 1279–1287) correctly finalises the handler’s close sequence. However, there are no tests validating its S3 integration behaviours—especially around multipart uploads. Please add unit tests that cover:
- Verifying that
_complete_multipart_upload()
is called whenuse_multipart_upload
is enabled. - Ensuring that the output stream is properly closed and the handler’s internal state is updated.
- Testing AWS S3 interactions using a suitable mocking strategy.
Consider using the Moto library for its simple, decorator-based approach to simulating AWS S3, or Botocore Stubs for precise control over AWS API responses. Alternatively, LocalStack can be used for integration-style testing, though it typically requires a more extensive setup.
f"It must be between {MIN_UPLOAD_PART_SIZE} and {MAX_UPLOAD_PART_SIZE}." | ||
) | ||
self.max_part_num: int = max_part_num if max_part_num else MAX_PART_NUM_PER_UPLOAD | ||
self._uploaded_parts: List[Dict[str, int | str]] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix type annotation for Python 3.7+ compatibility.
The type annotation uses the pipe operator (|
) which is only supported in Python 3.10+. This needs to be fixed to maintain compatibility with Python 3.7+ as specified in the project requirements.
- self._uploaded_parts: List[Dict[str, int | str]] = []
+ self._uploaded_parts: List[Dict[str, Union[int, str]]] = []
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
self._uploaded_parts: List[Dict[str, int | str]] = [] | |
self._uploaded_parts: List[Dict[str, Union[int, str]]] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
src/clp_logging/handlers.py (4)
1055-1086
: Non-interface instance variables should be prefixed with underscore.According to Python naming conventions, attributes that aren't part of the public interface should be prefixed with an underscore.
- self.closed: bool = False - self.enable_compression: bool = enable_compression + self._closed: bool = False + self._enable_compression: bool = enable_compression
1123-1140
: Consider using timestamp format for better log organization.The current implementation uses a Unix timestamp for file naming. Consider using a more human-readable format that would make it easier to identify and browse logs.
- upload_time: str = str(int(self._start_timestamp.timestamp())) + upload_time: str = self._start_timestamp.strftime("%Y%m%d-%H%M%S")🧰 Tools
🪛 Ruff (0.8.2)
1127-1127: Line too long (148 > 100)
(E501)
1127-1127
: Reduce line length for better readability.This line exceeds the recommended maximum length of 100 characters.
- self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" + self._remote_folder_path: str = ( + f"{self.s3_directory}{self._start_timestamp.year}/" + f"{self._start_timestamp.month}/{self._start_timestamp.day}" + )🧰 Tools
🪛 Ruff (0.8.2)
1127-1127: Line too long (148 > 100)
(E501)
1142-1180
: Consider optimizing upload frequency based on buffer size.The current implementation calls
_flush()
on every write operation, which could lead to many small uploads. Consider only flushing when the buffer reaches a certain size.- self._flush() + # Only flush if needed + if (not self.use_multipart_upload) or (self._local_buffer.tell() >= self.upload_part_size): + self._flush()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/clp_logging/handlers.py
(4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/clp_logging/handlers.py (3)
src/clp_logging/auto_generated_kv_pairs_utils.py (1)
AutoGeneratedKeyValuePairsBuffer
(19-68)tests/test_handlers.py (6)
close
(127-128)close
(189-192)close
(332-337)close
(508-511)close
(554-557)close
(691-693)src/clp_logging/readers.py (2)
close
(208-209)close
(400-404)
🪛 Ruff (0.8.2)
src/clp_logging/handlers.py
1127-1127: Line too long (148 > 100)
(E501)
1204-1204: Line too long (105 > 100)
(E501)
1242-1242: f-string without any placeholders
Remove extraneous f
prefix
(F541)
🔇 Additional comments (8)
src/clp_logging/handlers.py (8)
1016-1040
: LGTM: Well-documented class with clear parameters.The class is well-structured and thoroughly documented with clear explanations for each parameter.
1041-1054
: Parameters have sensible defaults and handle optional configuration.The constructor accepts both required parameters (s3_bucket) and optional ones with sensible defaults, allowing for flexible configuration.
1087-1105
: Fix type annotation for Python 3.7+ compatibility.The type annotation uses the pipe operator (
|
) which is only supported in Python 3.10+. This needs to be fixed to maintain compatibility with earlier Python versions.- self._uploaded_parts: List[Dict[str, int | str]] = [] + self._uploaded_parts: List[Dict[str, Union[int, str]]] = []
1106-1122
: LGTM: Good stream initialization with proper error handling.The method correctly initializes the stream and handles compression based on the provided settings.
1182-1226
: LGTM: Robust multipart upload implementation with checksum verification.The multipart upload implementation correctly handles part uploads, checksums, and error scenarios. The abort operation in the exception handler ensures no partial uploads remain if an error occurs.
🧰 Tools
🪛 Ruff (0.8.2)
1204-1204: Line too long (105 > 100)
(E501)
1227-1246
: Fix incorrect attribute references in PutObject validation.The code references
self.s3_client
andself.obj_key
instead ofself._s3_client
andself._obj_key
.- response = self.s3_client.head_object(Bucket=self.s3_bucket, Key=self.obj_key) + response = self._s3_client.head_object(Bucket=self.s3_bucket, Key=self._obj_key) - raise ValueError(f"Checksum mismatch. Upload aborted.") + raise ValueError("Checksum mismatch. Upload aborted.")🧰 Tools
🪛 Ruff (0.8.2)
1242-1242: f-string without any placeholders
Remove extraneous
f
prefix(F541)
1247-1280
: LGTM: Clean implementation of multipart upload completion.The code correctly completes the multipart upload, cleans up resources, and handles error scenarios appropriately.
1282-1290
: Add enhanced unit testing for S3 handler implementation.There are no unit tests for this new class. Consider adding comprehensive tests to verify the functionality, especially for the S3 integration, using moto or a similar library to mock AWS services.
How to test AWS S3 code in Python with moto?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
src/clp_logging/handlers.py (2)
1097-1097
:⚠️ Potential issueFix type annotation for Python 3.7+ compatibility.
The type annotation uses the pipe operator (
|
) which is only supported in Python 3.10+. This needs to be fixed to maintain compatibility with Python 3.7+ as specified in the project requirements.- self._uploaded_parts: List[Dict[str, int | str]] = [] + self._uploaded_parts: List[Dict[str, Union[int, str]]] = []
1238-1238
:⚠️ Potential issueFix incorrect attribute reference in PutObject validation.
The code references
self.obj_key
instead ofself._obj_key
.- response: Dict[str, Any] = self._s3_client.head_object(Bucket=self.s3_bucket, Key=self.obj_key) + response: Dict[str, Any] = self._s3_client.head_object(Bucket=self.s3_bucket, Key=self._obj_key)🧰 Tools
🪛 Ruff (0.8.2)
1238-1238: Line too long (111 > 100)
(E501)
🧹 Nitpick comments (5)
src/clp_logging/handlers.py (5)
1056-1059
: Use consistent naming conventions for instance variables.Some instance variables are prefixed with
_
while others aren't. According to Python naming conventions, non-interface level instance variables should be prefixed with_
to indicate they're "private".- self.closed: bool = False - self.enable_compression: bool = enable_compression + self._closed: bool = False + self._enable_compression: bool = enable_compression
1127-1127
: Line exceeds recommended length limit.This line is 148 characters long, exceeding the recommended 100 character limit.
- self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" + year, month, day = self._start_timestamp.year, self._start_timestamp.month, self._start_timestamp.day + self._remote_folder_path: str = f"{self.s3_directory}{year}/{month}/{day}"🧰 Tools
🪛 Ruff (0.8.2)
1127-1127: Line too long (148 > 100)
(E501)
1130-1131
: Consider using a more human-readable timestamp format.Using a raw UNIX timestamp in filenames works for sorting but may be less readable for humans. Consider providing an option for a more human-readable format or appending the UNIX timestamp after a readable date.
- upload_time: str = str(int(self._start_timestamp.timestamp())) + # Format: YYYYMMDD-HHMMSS-timestamp for both human readability and sorting + upload_time: str = f"{self._start_timestamp.strftime('%Y%m%d-%H%M%S')}-{int(self._start_timestamp.timestamp())}"
1242-1242
: Remove unnecessary f-string.This line uses an f-string without any dynamic values, which should be converted to a regular string.
- raise ValueError(f"Checksum mismatch. Upload aborted.") + raise ValueError("Checksum mismatch. Upload aborted.")🧰 Tools
🪛 Ruff (0.8.2)
1242-1242: f-string without any placeholders
Remove extraneous
f
prefix(F541)
1017-1290
: Add comprehensive unit tests for the S3 handler.This new handler needs comprehensive unit tests, particularly to validate the S3 integration with proper mocking.
To facilitate testing of S3 operations without depending on actual AWS infrastructure, consider using the Moto library. Here's how you could set up a basic test:
import boto3 import io import pytest from moto import mock_aws from clp_logging.handlers import CLPS3Handler @mock_aws def test_s3_handler_initialization(): # Create a virtual S3 bucket s3 = boto3.client("s3", region_name="us-east-1") s3.create_bucket(Bucket="test-bucket") # Initialize the handler handler = CLPS3Handler(s3_bucket="test-bucket") # Verify the handler was properly initialized assert handler.s3_bucket == "test-bucket" assert not handler.closed # Close the handler to complete uploads handler.close()A comprehensive test suite should cover:
- Basic initialization and configuration
- Writing and flushing logs
- Multipart upload behavior
- Error handling and retries
- File rotation
- Proper cleanup on close()
Would you like me to generate a more comprehensive test suite for this handler?
🧰 Tools
🪛 Ruff (0.8.2)
1127-1127: Line too long (148 > 100)
(E501)
1204-1204: Line too long (105 > 100)
(E501)
1238-1238: Line too long (111 > 100)
(E501)
1242-1242: f-string without any placeholders
Remove extraneous
f
prefix(F541)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/clp_logging/handlers.py
(4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/clp_logging/handlers.py (1)
src/clp_logging/auto_generated_kv_pairs_utils.py (1)
AutoGeneratedKeyValuePairsBuffer
(19-68)
🪛 Ruff (0.8.2)
src/clp_logging/handlers.py
1127-1127: Line too long (148 > 100)
(E501)
1204-1204: Line too long (105 > 100)
(E501)
1238-1238: Line too long (111 > 100)
(E501)
1242-1242: f-string without any placeholders
Remove extraneous f
prefix
(F541)
🔇 Additional comments (9)
src/clp_logging/handlers.py (9)
23-28
: Good organization of new imports.The new imports required for AWS S3 functionality are well organized and clearly separated from the existing imports with a blank line, maintaining the project's import structure.
51-54
: Well-defined constants for AWS S3 limitations.Good practice defining these constants with clear, descriptive names rather than using magic numbers in the code. This makes the code more maintainable and self-documenting.
1017-1039
: Comprehensive class docstring.The class documentation clearly explains the purpose and all parameters of the handler, which helps with code maintainability and usability.
1068-1084
: Good S3 configuration setup with proper error handling.The S3 client initialization correctly handles credentials and has appropriate error handling for common AWS configuration issues. The flexibility to configure a custom S3 directory with proper path handling is also well implemented.
1085-1105
: Well-implemented multipart upload initialization with validation.The code properly validates upload part size against AWS limitations and sets up the multipart upload with appropriate error handling. The flexibility to configure maximum part numbers is also well implemented.
1142-1180
: Well-implemented _write method with proper rotation handling.The
_write
method efficiently manages local buffering of log entries and properly implements file rotation when the part count exceeds the maximum. The dual-mode support for multipart upload vs. PutObject is also well implemented.
1182-1226
: Robust implementation of multipart upload with checksums.The multipart upload implementation correctly manages part uploads, tracks ETags and checksums, and has proper error handling including abort functionality. The checksum validation provides good data integrity protection.
🧰 Tools
🪛 Ruff (0.8.2)
1204-1204: Line too long (105 > 100)
(E501)
1247-1280
: Well-implemented multipart upload completion.The method correctly finalizes the multipart upload by writing the EOF character, completing the upload with all parts, and properly aborting on errors. The error handling and part list construction are well designed.
1282-1290
: Clean implementation of the close method.The
close
method properly completes the multipart upload when needed, closes resources, and maintains the handler's state. It also calls the parent class's close method which is good practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/clp_logging/handlers.py (1)
1097-1097
:⚠️ Potential issueFix type annotation for Python compatibility.
The type annotation uses the pipe operator (
|
) which is only supported in Python 3.10+. This should be replaced withUnion
for better compatibility.- self._uploaded_parts: List[Dict[str, int | str]] = [] + self._uploaded_parts: List[Dict[str, Union[int, str]]] = []
🧹 Nitpick comments (3)
src/clp_logging/handlers.py (3)
1123-1139
: Naming convention for remote logs is clear.The method creates a well-structured path with timestamp and versioning. Consider breaking long lines for better readability.
- self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" + self._remote_folder_path: str = ( + f"{self.s3_directory}" + f"{self._start_timestamp.year}/" + f"{self._start_timestamp.month}/" + f"{self._start_timestamp.day}" + )🧰 Tools
🪛 Ruff (0.8.2)
1127-1127: Line too long (148 > 100)
(E501)
1204-1204
: Consider shortening long line for better readability.This line exceeds the 100-character limit recommended by static analysis.
- raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.") + raise ValueError( + f"Checksum mismatch for part {self._upload_index}. Upload aborted." + )🧰 Tools
🪛 Ruff (0.8.2)
1204-1204: Line too long (105 > 100)
(E501)
1245-1245
: Remove unnecessary f-string prefix.This string doesn't contain any placeholders, so it doesn't need to be an f-string.
- raise ValueError(f"Checksum mismatch. Upload aborted.") + raise ValueError("Checksum mismatch. Upload aborted.")🧰 Tools
🪛 Ruff (0.8.2)
1245-1245: f-string without any placeholders
Remove extraneous
f
prefix(F541)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/clp_logging/handlers.py
(4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/clp_logging/handlers.py (1)
src/clp_logging/auto_generated_kv_pairs_utils.py (1)
AutoGeneratedKeyValuePairsBuffer
(19-68)
🪛 Ruff (0.8.2)
src/clp_logging/handlers.py
1127-1127: Line too long (148 > 100)
(E501)
1204-1204: Line too long (105 > 100)
(E501)
1245-1245: f-string without any placeholders
Remove extraneous f
prefix
(F541)
🔇 Additional comments (11)
src/clp_logging/handlers.py (11)
15-15
: LGTM: Import statement updated to include Union type.The import statement is updated to include
Union
, which is necessary for the type hints in the newCLPS3Handler
class.
23-29
: LGTM: Necessary imports added for S3 functionality.All the required imports for AWS S3 interaction, encoding, and buffering are properly added.
51-54
: LGTM: Well-defined constants for S3 upload limits.Constants are properly named and documented with correct values matching AWS S3 specifications.
1017-1056
: Class and documentation looks good.The class definition and docstring provide clear explanations of each parameter and the overall functionality. The class properly extends
CLPBaseHandler
for the S3 functionality.
1057-1079
: Good error handling for S3 client initialization.The code properly handles potential AWS credential and client initialization errors with clear error messages.
1080-1105
: Multipart upload initialization is well implemented.The code correctly initializes multipart upload parameters, validates input, and creates the initial upload job with AWS S3.
1106-1122
: LGTM: Stream initialization is well implemented.The method properly initializes the compression context and writes the preamble to the output stream.
1142-1180
: Write and file rotation logic look solid.The method correctly handles writing log messages to the buffer, manages part rotation when limits are reached, and reinitializes multipart uploads when necessary.
1182-1249
: Flush method handles both upload approaches properly.The method supports both multipart and regular uploads with proper checksum verification for data integrity. Error handling is robust.
🧰 Tools
🪛 Ruff (0.8.2)
1204-1204: Line too long (105 > 100)
(E501)
1245-1245: f-string without any placeholders
Remove extraneous
f
prefix(F541)
1250-1283
: Complete upload implementation looks good.The method correctly finalizes multipart uploads, handles parts information, and has proper error handling.
1285-1293
: Close method correctly finalizes the upload process.The method properly handles completion of the upload process and closes all resources.
Bucket=self.s3_bucket, | ||
Key=self._obj_key, | ||
Body=data, | ||
PartNumber=self._upload_index, | ||
UploadId=self._upload_id, | ||
ChecksumSHA256=sha256_checksum, | ||
) | ||
|
||
# Verify integrity of the uploaded part using SHA256 Checksum | ||
if response["ChecksumSHA256"] != sha256_checksum: | ||
raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.") | ||
|
||
# Store both ETag and SHA256 for validation | ||
upload_status: Dict[str, int | str] = { | ||
"PartNumber": self._upload_index, | ||
"ETag": response["ETag"], | ||
"ChecksumSHA256": response["ChecksumSHA256"], | ||
} | ||
|
||
# Determine the part to which the new upload_status belongs | ||
if len(self._uploaded_parts) > self._upload_index - 1: | ||
self._uploaded_parts[self._upload_index-1] = upload_status | ||
else: | ||
self._uploaded_parts.append(upload_status) | ||
|
||
except Exception as e: | ||
self._s3_client.abort_multipart_upload( | ||
Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id | ||
) | ||
raise Exception( | ||
f'Multipart Upload on Part {self._upload_index}: {e}' | ||
) from e | ||
else: | ||
self._ostream.write(EOF_CHAR) | ||
try: | ||
self._s3_client.put_object( | ||
Bucket=self.s3_bucket, | ||
Key=self._obj_key, | ||
Body=data, | ||
ContentEncoding='zstd' if self.enable_compression else 'binary', | ||
ChecksumSHA256=sha256_checksum | ||
) | ||
|
||
# Verify integrity of the upload using SHA256 Checksum | ||
response: Dict[str, Any] = self._s3_client.head_object( | ||
Bucket=self.s3_bucket, | ||
Key=self._obj_key | ||
) | ||
if 'ChecksumSHA256' in response: | ||
s3_checksum: str = response['ChecksumSHA256'] | ||
if s3_checksum != sha256_checksum: | ||
raise ValueError(f"Checksum mismatch. Upload aborted.") | ||
|
||
except Exception as e: | ||
raise Exception(f'Failed to upload using PutObject: {e}') | ||
|
||
def _complete_multipart_upload(self) -> None: | ||
""" | ||
Complete a multipart upload session and clear the local buffer. | ||
""" | ||
# Flush EOF marker to the local buffer and upload | ||
self._ostream.write(EOF_CHAR) | ||
self._flush() | ||
self._local_buffer.seek(0) | ||
self._local_buffer.truncate(0) | ||
|
||
try: | ||
self._s3_client.complete_multipart_upload( | ||
Bucket=self.s3_bucket, | ||
Key=self._obj_key, | ||
UploadId=self._upload_id, | ||
MultipartUpload={ | ||
"Parts": [ | ||
{ | ||
"PartNumber": part["PartNumber"], | ||
"ETag": part["ETag"], | ||
"ChecksumSHA256": part["ChecksumSHA256"], | ||
} | ||
for part in self._uploaded_parts | ||
] | ||
}, | ||
) | ||
except Exception as e: | ||
self._s3_client.abort_multipart_upload( | ||
Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id | ||
) | ||
raise Exception( | ||
f'Multipart Upload on Part {self._upload_index}: {e}' | ||
) from e | ||
|
||
# override | ||
def close(self) -> None: | ||
""" | ||
Complete the upload if needed. Close the stream and the handler. | ||
""" | ||
if self.use_multipart_upload: | ||
self._complete_multipart_upload() | ||
self._ostream.close() | ||
self.closed = True | ||
super().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Add unit tests for the new S3 handler.
While the implementation looks solid, there are no unit tests for this new class. Consider adding comprehensive tests to verify the functionality, especially for the S3 integration, using moto or a similar library to mock AWS services.
I can help you with creating unit tests for this class. Here's a sample structure to consider:
🏁 Script executed:
#!/bin/bash
# Search for existing test patterns in the codebase
echo "Searching for existing test files..."
find . -type f -name "test_*.py" | head -5
Length of output: 152
Action: Add Unit Tests for the New S3 Handler
The new CLPS3Handler
implementation currently lacks dedicated unit tests to verify its S3 integration and error management. Although there is an existing test suite (e.g. in ./tests/test_handlers.py
), it does not appear to cover this new functionality. Please consider the following:
- Test Coverage: Add tests that cover both multipart and non-multipart upload workflows.
- Error Handling: Verify that errors such as invalid AWS credentials and checksum mismatches are handled appropriately.
- S3 Simulation: Implement tests using a library like moto to mock AWS S3 services, ensuring that operations like part rotations and complete/abort workflows are accurately simulated.
- Integration Scenarios: Ensure tests confirm that S3 interactions (like creating multipart uploads, uploading parts, and completing uploads) function as expected in various scenarios.
🧰 Tools
🪛 Ruff (0.8.2)
1127-1127: Line too long (148 > 100)
(E501)
1204-1204: Line too long (105 > 100)
(E501)
1245-1245: f-string without any placeholders
Remove extraneous f
prefix
(F541)
This PR adds
CLPRemoteHandler
to support S3 Multipart uploading of compressed log files. User can upload existing log files or upload log files that are handled byCLPFileHandler
Description
Design and Structure
The CLPS3Handler class sends logging output to streams in CLP IR encoding and uploads streams to the specified AWS S3 bucket.
class CLPS3Handler (s3_bucket)
Returns a new instance of CLPS3Handler class. Configures the provided AWS S3 bucket and calls init() to initialize the output stream and local buffer. Calls _remote_log_naming() to set up the object key. After that, the instance calls create_multipart_upload() from AWS API to start an upload if multipart_uploading is enabled.
Parameters:
s3_bucket: AWS S3 bucket to upload CLP encoded log messages to
stream: target stream to write log messages to
Enable_compression: Option to enable/disable stream compression
timestamp_format: timestamp format written in preamble to be used when generating logs with a reader
timezone: timezone written in preamble to be used when generating the timestamp from unix epoch time
Aws_access_key_id: user’s public access key for the s3 bucket
Aws_secret_access_key: user’s private access key for the s3 bucket
use_multipart_upload: Option to use multipart upload to upload stream segments or use PutObject to upload the entire buffer
max_part_num: maximum number of parts allowed for a multipart upload session before uploading to a new object
upload_part_size: maximum size of a part in multipart upload session before writing to a new part.
Functions:
_init_stream
_remote_log_naming
_write()
If the local io buffer exceeds a predetermined buffer size (5MB), then clear the local buffer and increment the upload index such that new content of the local buffer will be written to the next part of the multipart upload.
If the upload index exceeds the maximum number of parts (self.max_part_num), then complete the current multipart upload and initiate a new multipart upload new a new object key (created using self._remote_log_naming())
_flush()
Calls upload_part from AWS API to upload the local buffer into the current segment (index identified by self._upload_index
Records the upload status (part number and checksum values)
Verify integrity of the upload
Aborts the multipart upload if there is any exception
Call put_object from AWS API to upload the local buffer
Verify integrity of the upload
Raise exception if there is any
_complete_multipart_upload()
close()
User Side Configuration Requirements
Please ensure the s3 bucket is created.
Validation performed
The following tests are performed manually.
Test method: Run the following code and verify functionality.
Test method: Test by setting the segment limit to a small number and see if the remaining segments will be uploaded to a new file.
Unit tests will be added in future PRs to control the length of PRs. Detailed test plan will be provided.
Please let me know if there are any feedback/suggestions. Thanks!
Summary by CodeRabbit
Summary by CodeRabbit
- Enhanced log management now supports direct cloud uploads, ensuring reliable storage and efficient delivery of log data.
- Introduced a new logging handler that allows for configurable options including data compression, timestamp format, and timezone settings for improved log handling.
- Expanded optional dependencies for development and testing, enhancing project capabilities.
- Added support for new libraries to facilitate S3 interactions and testing.