Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions aws/logs_monitoring/caching/base_tags_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def write_cache_to_s3(self, data):
DD_S3_BUCKET_NAME, self.get_cache_name_with_prefix()
)
s3_object.put(Body=(bytes(json.dumps(data).encode("UTF-8"))))
except ClientError:
except ClientError as e:
send_forwarder_internal_metrics("s3_cache_write_failure")
self.logger.debug("Unable to write new cache to S3", exc_info=True)
self.logger.debug(f"Unable to write new cache to S3: {e}", exc_info=True)

def acquire_s3_cache_lock(self):
"""Acquire cache lock"""
Expand All @@ -76,16 +76,16 @@ def acquire_s3_cache_lock(self):
last_modified_unix_time = get_last_modified_time(file_content)
if last_modified_unix_time + DD_S3_CACHE_LOCK_TTL_SECONDS >= time():
return False
except Exception:
self.logger.debug("Unable to get cache lock file")
except Exception as e:
self.logger.debug(f"Unable to get cache lock file: {e}")

# lock file doesn't exist, create file to acquire lock
try:
cache_lock_object.put(Body=(bytes("lock".encode("UTF-8"))))
send_forwarder_internal_metrics("s3_cache_lock_acquired")
self.logger.debug("S3 cache lock acquired")
except ClientError:
self.logger.debug("Unable to write S3 cache lock file", exc_info=True)
except ClientError as e:
self.logger.debug(f"Unable to write S3 cache lock file: {e}", exc_info=True)
return False

return True
Expand All @@ -99,9 +99,9 @@ def release_s3_cache_lock(self):
cache_lock_object.delete()
send_forwarder_internal_metrics("s3_cache_lock_released")
self.logger.debug("S3 cache lock released")
except ClientError:
except ClientError as e:
send_forwarder_internal_metrics("s3_cache_lock_release_failure")
self.logger.debug("Unable to release S3 cache lock", exc_info=True)
self.logger.debug(f"Unable to release S3 cache lock: {e}", exc_info=True)

def get_cache_from_s3(self):
"""Retrieves tags cache from s3 and returns the body along with
Expand All @@ -113,9 +113,9 @@ def get_cache_from_s3(self):
file_content = cache_object.get()
tags_cache = json.loads(file_content["Body"].read().decode("utf-8"))
last_modified_unix_time = get_last_modified_time(file_content)
except:
except Exception as e:
send_forwarder_internal_metrics("s3_cache_fetch_failure")
self.logger.debug("Unable to fetch cache from S3", exc_info=True)
self.logger.debug(f"Unable to fetch cache from S3: {e}", exc_info=True)
return {}, -1

return tags_cache, last_modified_unix_time
Expand Down
12 changes: 6 additions & 6 deletions aws/logs_monitoring/caching/cloudwatch_log_group_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ def _get_log_group_tags_from_cache(self, cache_file_name):
)
tags_cache = json.loads(response.get("Body").read().decode("utf-8"))
last_modified_unix_time = int(response.get("LastModified").timestamp())
except Exception:
except Exception as e:
send_forwarder_internal_metrics("loggroup_cache_fetch_failure")
self.logger.exception(
"Failed to get log group tags from cache", exc_info=True
f"Failed to get log group tags from cache: {e}", exc_info=True
)
return None, -1

Expand All @@ -120,10 +120,10 @@ def _update_log_group_tags_cache(self, log_group, tags):
Key=cache_file_name,
Body=(bytes(json.dumps(tags).encode("UTF-8"))),
)
except Exception:
except Exception as e:
send_forwarder_internal_metrics("loggroup_cache_write_failure")
self.logger.exception(
"Failed to update log group tags cache", exc_info=True
f"Failed to update log group tags cache: {e}", exc_info=True
)

def _is_expired(self, last_modified):
Expand All @@ -150,8 +150,8 @@ def _get_log_group_tags(self, log_group_arn):
response = self.cloudwatch_logs_client.list_tags_for_resource(
resourceArn=log_group_arn
)
except Exception:
self.logger.exception("Failed to get log group tags", exc_info=True)
except Exception as e:
self.logger.exception(f"Failed to get log group tags: {e}", exc_info=True)
formatted_tags = None
if response is not None:
formatted_tags = [
Expand Down
4 changes: 2 additions & 2 deletions aws/logs_monitoring/caching/lambda_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def build_tags_cache(self):

except ClientError as e:
self.logger.exception(
"Encountered a ClientError when trying to fetch tags. You may need to give "
"this Lambda's role the 'tag:GetResources' permission"
f"Failed to fetch Lambda tags: {e}. "
"Add 'tag:GetResources' permission to the Forwarder's IAM role."
)
additional_tags = [
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring/caching/s3_tags_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def build_tags_cache(self):
except ClientError as e:
self.logger.exception(
"Encountered a ClientError when trying to fetch tags. You may need to give "
"this Lambda's role the 'tag:GetResources' permission"
f"this Lambda's role the 'tag:GetResources' permission: {e}"
)
additional_tags = [
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring/caching/step_functions_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def build_tags_cache(self):
except ClientError as e:
self.logger.exception(
"Encountered a ClientError when trying to fetch tags. You may need to give "
"this Lambda's role the 'tag:GetResources' permission"
f"this Lambda's role the 'tag:GetResources' permission: {e}"
)
additional_tags = [
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
Expand Down
14 changes: 8 additions & 6 deletions aws/logs_monitoring/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ def _forward_logs(self, logs, key=None):
for batch in batcher.batch(logs_to_forward):
try:
client.send(batch)
except Exception:
logger.exception(f"Exception while forwarding log batch {batch}")
except Exception as e:
logger.exception(
f"Exception while forwarding log batch {batch}: {e}"
)
failed_logs.extend(batch)
else:
if logger.isEnabledFor(logging.DEBUG):
Expand All @@ -142,9 +144,9 @@ def _forward_metrics(self, metrics, key=None):
for metric in metrics:
try:
send_log_metric(metric)
except Exception:
except Exception as e:
logger.exception(
f"Exception while forwarding metric {json.dumps(metric)}"
f"Exception while forwarding metric {json.dumps(metric)}: {e}"
)
failed_metrics.append(metric)
else:
Expand All @@ -168,9 +170,9 @@ def _forward_traces(self, traces, key=None):
try:
serialized_trace_paylods = json.dumps(traces)
self.trace_connection.send_traces(serialized_trace_paylods)
except Exception:
except Exception as e:
logger.exception(
f"Exception while forwarding traces {serialized_trace_paylods}"
f"Exception while forwarding traces {serialized_trace_paylods}: {e}"
)
if DD_STORE_FAILED_EVENTS and not key:
self.storage.store_data(RetryPrefix.TRACES, traces)
Expand Down
39 changes: 24 additions & 15 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,46 @@
# Copyright 2021 Datadog, Inc.

import json
import os
import boto3
import logging
import requests
import os
from hashlib import sha1

from datadog_lambda.wrapper import datadog_lambda_wrapper
import boto3
import requests
from datadog import api
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
from steps.parsing import parse
from steps.enrichment import enrich
from steps.transformation import transform
from steps.splitting import split
from datadog_lambda.wrapper import datadog_lambda_wrapper

from caching.cache_layer import CacheLayer
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
from forwarder import Forwarder
from settings import (
DD_ADDITIONAL_TARGET_LAMBDAS,
DD_API_KEY,
DD_SKIP_SSL_VALIDATION,
DD_API_URL,
DD_FORWARDER_VERSION,
DD_ADDITIONAL_TARGET_LAMBDAS,
DD_RETRY_KEYWORD,
DD_SITE,
DD_SKIP_SSL_VALIDATION,
)
from steps.enrichment import enrich
from steps.parsing import parse
from steps.splitting import split
from steps.transformation import transform

logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))

# DD_API_KEY must be set
if DD_API_KEY == "<YOUR_DATADOG_API_KEY>" or DD_API_KEY == "":
raise Exception("Missing Datadog API key")
raise Exception(
"Missing Datadog API key. Set DD_API_KEY environment variable. "
"See: https://docs.datadoghq.com/serverless/forwarder/"
)
# Check if the API key is the correct number of characters
if len(DD_API_KEY) != 32:
raise Exception(
"The API key is not the expected length. "
"Please confirm that your API key is correct"
f"Invalid Datadog API key format. Expected 32 characters, received {len(DD_API_KEY)}. "
f"Verify your API key at https://app.{DD_SITE}/organization-settings/api-keys"
)
# Validate the API key
logger.debug("Validating the Datadog API key")
Expand All @@ -57,7 +62,11 @@
timeout=10,
)
if not validation_res.ok:
raise Exception("The API key is not valid.")
raise Exception(
f"Datadog API key validation failed (HTTP {validation_res.status_code}). "
f"Verify your API key is correct and DD_SITE matches your Datadog account region (current: {DD_SITE}). "
"See: https://docs.datadoghq.com/getting_started/site/"
)

# Force the layer to use the exact same API key and host as the forwarder
api._api_key = DD_API_KEY
Expand Down
4 changes: 2 additions & 2 deletions aws/logs_monitoring/logs/datadog_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def send(self, logs):
"""
try:
data = self._scrubber.scrub("[{}]".format(",".join(logs)))
except ScrubbingException:
raise Exception("could not scrub the payload")
except ScrubbingException as e:
raise Exception(f"could not scrub the payload: {e}")
if DD_USE_COMPRESSION:
data = compress_logs(data, DD_COMPRESSION_LEVEL)

Expand Down
7 changes: 5 additions & 2 deletions aws/logs_monitoring/logs/datadog_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,8 @@ def match(self, log):

return True

except ScrubbingException:
raise Exception("could not filter the payload")
except ScrubbingException as e:
raise Exception(f"Failed to filter log: {e}")

except Exception as e:
raise Exception(f"Failed to filter log: {e}")
10 changes: 5 additions & 5 deletions aws/logs_monitoring/logs/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ def compileRegex(rule, pattern):
if pattern == "":
# If pattern is an empty string, raise exception
raise Exception(
"No pattern provided:\nAdd pattern or remove {} environment variable".format(
rule
)
f"Empty pattern for {rule}. Set a valid regex pattern or remove the {rule} environment variable."
)
try:
return re.compile(pattern)
except Exception:
except re.error as e:
raise Exception(
"could not compile {} regex with pattern: {}".format(rule, pattern)
f"Invalid regex pattern for {rule}: '{pattern}'. Regex error: {e}"
)
except Exception as e:
raise Exception(f"Failed to compile {rule} regex pattern '{pattern}': {e}")


def add_retry_tag(log):
Expand Down
12 changes: 6 additions & 6 deletions aws/logs_monitoring/retry/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ def store_data(self, prefix, data):
self.s3_client.put_object(
Bucket=self.bucket_name, Key=key, Body=serialized_data
)
except ClientError:
logger.error(f"Failed to store retry data for prefix {prefix}")
except ClientError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this line in particular

I think it's also worth standardizing log.exception and log.error usage depending on what we catch. I noticed a mixed usage of log.error and log.exception in this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very aware of the difference between the two. Before I deep dive into the documentation, do you have any preferences / insights on which solution we should use?

logger.error(f"Failed to store retry data for prefix {prefix}: {e}")

def delete_data(self, key):
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=key)
except ClientError:
logger.error(f"Failed to delete retry data for key {key}")
except ClientError as e:
logger.error(f"Failed to delete retry data for key {key}: {e}")

def _list_keys(self, prefix):
key_prefix = self._get_key_prefix(prefix)
Expand All @@ -68,8 +68,8 @@ def _fetch_data_for_key(self, key):
body = response.get("Body")
data = body.read()
return self._deserialize(data)
except ClientError:
logger.error(f"Failed to fetch retry data for key {key}")
except ClientError as e:
logger.error(f"Failed to fetch retry data for key {key}: {e}")
return None
except Exception as e:
logger.error(
Expand Down
8 changes: 4 additions & 4 deletions aws/logs_monitoring/steps/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ def extract_ddtags_from_message(event):
message_dict = json.loads(event["message"])
extracted_ddtags = message_dict.pop(DD_CUSTOM_TAGS)
event["message"] = json.dumps(message_dict)
except Exception:
except Exception as e:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Failed to extract ddtags from: {event}")
logger.debug(f"Failed to extract ddtags from: {event}: {e}")
return

# strip and cleanup spaces from extracted tags:
Expand Down Expand Up @@ -191,8 +191,8 @@ def extract_host_from_cloudtrails(event):
if isinstance(message, str):
try:
message = json.loads(message)
except json.JSONDecodeError:
logger.debug("Failed to decode cloudtrail message")
except json.JSONDecodeError as e:
logger.debug(f"Failed to decode cloudtrail message: {e}")
return

# deal with s3 input type events
Expand Down
4 changes: 2 additions & 2 deletions aws/logs_monitoring/steps/handlers/aws_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ def set_account_region(self, arn):
parts = arn.split(":")
self.account = parts[4]
self.region = parts[3]
except Exception:
raise Exception("Failed to parse account and region from ARN")
except Exception as e:
raise Exception(f"Failed to parse account and region from ARN: {e}")