Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 192 additions & 29 deletions Packs/DomainTools_Iris/Integrations/DomainTools_Iris/DomainTools_Iris.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
""" HELPER FUNCTIONS """


def get_client(proxy_url: Optional[str] = None):
def get_client(proxy_url: str | None = None):
return API(
USERNAME,
API_KEY,
Expand All @@ -108,6 +108,7 @@ def http_request(method: str, params: dict = {}):
Returns: request result

"""
response = None
proxy_url = PROXIES.get("https") if PROXIES.get("https") != "" else PROXIES.get("http")
if not (USERNAME and API_KEY):
raise DemistoException("The 'API Username' and 'API Key' parameters are required.")
Expand All @@ -134,32 +135,53 @@ def http_request(method: str, params: dict = {}):
response = api.reverse_ip(domain=params.get("domain"), limit=params.get("limit")).response()
elif method == "host-domains":
response = api.host_domains(ip=params.get("ip"), limit=params.get("limit")).response()
elif method == "risk":
response = api.risk(domain=params.get("domain")).response()
else:
response = api.iris_investigate(**params).response()
except Exception as e:
demisto.error(str(e))
raise
# exclude for risk endpoint as it throws 404 status by default if domain was not found.
if method != "risk":
demisto.error(str(e))
raise

return response


def get_dbot_score(proximity_score, age, threat_profile_score):
def get_dbot_score(proximity_score, age=None, threat_profile_score=None, overall_risk_score=None):
"""
Gets the DBot score
info:
GOOD = 1
SUSPICIOUS = 2
BAD = 3
Args:
proximity_score: The proximity threat score deals with closeness to other malicious domains.
age: The age of the domain.
threat_profile_score: The threat profile score looking at things like phishing and spam.
overall_risk_score: The overall riskscore. Defaults to None.

Returns: DBot Score

"""
if proximity_score >= RISK_THRESHOLD or threat_profile_score >= RISK_THRESHOLD:
return 3
elif age < YOUNG_DOMAIN_TIMEFRAME and (proximity_score < RISK_THRESHOLD or threat_profile_score < RISK_THRESHOLD):
return 2
else:
return 1

# check for the 'BAD' condition then return.
if proximity_score and proximity_score >= RISK_THRESHOLD or (threat_profile_score and threat_profile_score >= RISK_THRESHOLD):
return Common.DBotScore.BAD

# check for 'SUSPICIOUS' conditions as we know both scores will be lower.
# this means we used the risk score endpoint which we don't check for the domain age
# and look for the overall risk score instead.
if overall_risk_score is not None:
if 50 <= overall_risk_score <= 69:
return Common.DBotScore.SUSPICIOUS

else: # otherwise, we look for the domain age.
if not age or age < YOUNG_DOMAIN_TIMEFRAME:
return Common.DBotScore.SUSPICIOUS

# If the domain is not BAD and not SUSPICIOUS, then return GOOD.
return Common.DBotScore.GOOD


def prune_context_data(data_obj):
Expand Down Expand Up @@ -459,6 +481,89 @@ def create_results(domain_result):
return outputs


def create_domain_risk_results(domain_result: dict):
"""
Creates all the context data necessary given a domain result
Args:
domain_result (dict): DomainTools domain data

Returns: dict {
domain: <Common.Domain> - Domain indicator object with Iris results that map to Domain context
domaintools: <dict> - DomainTools context with all Iris results
}

"""
domain = f"{domain_result.get('domain')}"

domain_risk_score_details = {
"overall_risk_score": domain_result.get("risk_score"),
"proximity": "",
"threat_profile": "",
"threat_profile_phishing": "",
"threat_profile_malware": "",
"threat_profile_spam": "",
}

for rc in domain_result.get("components") or []:
component_name = rc.get("name")
if component_name == "zerolist": # ignore zero list
continue
component_riskscore = rc.get("risk_score") or ""
domain_risk_score_details[component_name] = component_riskscore

domaintools_risk_context = {
"Name": domain,
"LastEnriched": datetime.now().strftime("%Y-%m-%d"),
"Analytics": {
"OverallRiskScore": domain_risk_score_details["overall_risk_score"],
"ProximityRiskScore": domain_risk_score_details["proximity"],
"MalwareRiskScGore": domain_risk_score_details["threat_profile_malware"],
"PhishingRiskScore": domain_risk_score_details["threat_profile_phishing"],
"SpamRiskScore": domain_risk_score_details["threat_profile_spam"],
"ThreatProfileRiskScore": {
"RiskScore": domain_risk_score_details["threat_profile"],
},
},
}

# get the db score
reliability = demisto.params().get("integrationReliability")
if domain_risk_score_details["overall_risk_score"] is None: # Score is Unknown
dbot_score = Common.DBotScore(
indicator=domain,
indicator_type=DBotScoreType.DOMAIN,
integration_name="DomainTools Iris",
score=Common.DBotScore.NONE,
reliability=reliability,
message="No current risk score found for this domain.",
)
else:
dbot_score_value = get_dbot_score(
proximity_score=domain_risk_score_details["proximity"],
age=None,
threat_profile_score=domain_risk_score_details["threat_profile"],
overall_risk_score=domain_risk_score_details["overall_risk_score"],
)

malicious_description = None
if dbot_score_value == Common.DBotScore.BAD:
malicious_description = "This domain has been profiled as a threat."

dbot_score = Common.DBotScore(
indicator=domain,
indicator_type=DBotScoreType.DOMAIN,
integration_name="DomainTools Iris",
score=dbot_score_value,
reliability=reliability,
malicious_description=malicious_description,
)

domain_indicator = Common.Domain(domain, dbot_score=dbot_score)

outputs = {"domain": domain_indicator, "domaintools": domaintools_risk_context}
return outputs


def domain_investigate(domain):
"""
Profiles domain and gives back all relevant domain data
Expand Down Expand Up @@ -538,6 +643,13 @@ def host_domains(ip: str, limit: int | None = None) -> dict:
return http_request("host-domains", params={"ip": ip, "limit": limit})


def get_domain_risk_score(domain: str) -> dict | None:
"""
Returns the domain risk score using /v1/risk endpoint instead using iris
"""
return http_request("risk", params={"domain": domain})


def add_key_to_json(cur, to_add):
if not cur:
return to_add
Expand Down Expand Up @@ -806,6 +918,34 @@ def format_investigate_output(result):
return (human_readable, indicators)


def format_risk_score_output(result):
RISK_SCORE_HEADERS = [
"Name",
"Last Enriched",
"Overall Risk Score",
"Proximity Risk Score",
"Threat Profile Risk Score",
]
domain = result.get("domain")
result_copy = copy.deepcopy(result)
indicators = create_domain_risk_results(result_copy)

domaintools_analytics_data = indicators.get("domaintools", {}).get("Analytics", {})

human_readable_data = {
"Name": f"[{domain}](https://domaintools.com)",
"Last Enriched": datetime.now().strftime("%Y-%m-%d"),
"Overall Risk Score": domaintools_analytics_data.get("OverallRiskScore", ""),
"Proximity Risk Score": domaintools_analytics_data.get("ProximityRiskScore", ""),
"Threat Profile Risk Score": domaintools_analytics_data.get("ThreatProfileRiskScore", {}).get("RiskScore", ""),
}

demisto_title = f"DomainTools Risk Score for {domain}."
human_readable = tableToMarkdown(f"{demisto_title}", human_readable_data, headers=RISK_SCORE_HEADERS)

return human_readable, indicators


def get_domain_risk_score_details(domain_risk: dict[str, Any]) -> dict[str, Any]:
"""Get the domain risk score details on a given domain risk

Expand Down Expand Up @@ -1073,34 +1213,57 @@ def domain_command():
e.g. !domain domain=domaintools.com
"""
domain = demisto.args()["domain"]
domain_result_type = demisto.params().get("domain_result_type") or "Iris"
domain_list = domain.split(",")
domain_chunks = chunks(domain_list, 100)
include_context = argToBoolean(demisto.args().get("include_context", True))
command_results_list: list[CommandResults] = []

for chunk in domain_chunks:
response = domain_investigate(",".join(chunk))
missing_domains = response.get("missing_domains")
for result in response.get("results", []):
human_readable_output, indicators = format_investigate_output(result)

if len(missing_domains) > 0:
human_readable_output += f"Missing Domains: {','.join(missing_domains)}"

domain_indicator = indicators.get("domain") if include_context else None
domaintools_context = indicators.get("domaintools") if include_context else None
command_results_list: list[CommandResults] = []

human_readable_output = ""
if domain_result_type.lower() == "verdict":
# get the risk score using DT risk endpoint
for domain in domain_list:
risk_score_result = get_domain_risk_score(domain=domain)
if risk_score_result is None:
risk_score_result = {"domain": domain}

human_readable_output, indicators = format_risk_score_output(risk_score_result)
domain_indicator = indicators.get("domain")
# for risk result (Verdict result type), create a indicator and add the verdict, no context needed.
command_results_list.append(
CommandResults(
outputs_prefix="DomainTools",
outputs_key_field="Name",
indicator=domain_indicator,
outputs=domaintools_context,
outputs=None,
readable_output=human_readable_output,
raw_response=result,
ignore_auto_extract=True,
raw_response=risk_score_result,
ignore_auto_extract=False,
)
)
else:
include_context = argToBoolean(demisto.args().get("include_context", True))
domain_chunks = chunks(domain_list, 100)
for chunk in domain_chunks:
response = domain_investigate(",".join(chunk))
missing_domains = response.get("missing_domains")
for result in response.get("results", []):
human_readable_output, indicators = format_investigate_output(result)

if len(missing_domains) > 0:
human_readable_output += f"Missing Domains: {','.join(missing_domains)}"

domain_indicator = indicators.get("domain") if include_context else None
domaintools_context = indicators.get("domaintools") if include_context else None

command_results_list.append(
CommandResults(
outputs_prefix="DomainTools",
outputs_key_field="Name",
indicator=domain_indicator,
outputs=domaintools_context,
readable_output=human_readable_output,
raw_response=result,
ignore_auto_extract=True,
)
)

if not command_results_list:
return_warning("No results.", exit=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ configuration:
type: 8
required: false
section: Connect
- display: Domain Result Type
name: domain_result_type
type: 15
required: false
additionalinfo: "Result type of the domain command: Iris returns full investigate results; Verdict returns only the domain risk score"
defaultvalue: Iris
options:
- Iris
- Verdict
section: Collect
- display: Source Reliability
name: integrationReliability
type: 15
Expand Down Expand Up @@ -2472,6 +2482,9 @@ script:
outputs:
- contextPath: Domain.Name
description: Name of the domain returned by the query.
# outputs:
# - contextPath: Domain.Name
# description: Name of the domain returned by the query.
dockerimage: demisto/vendors-sdk:1.0.0.3242986
runonce: false
script: '-'
Expand Down
22 changes: 5 additions & 17 deletions Packs/DomainTools_Iris/pack_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,16 @@
"name": "DomainTools Iris Investigate",
"description": "Facilitates automation of key infrastructure characterization and hunting portions of the incident response process. Organizations will have access to essential domain profile, web crawl, SSL, and infrastructure data from within Cortex XSOAR. Requires a DomainTools Iris Investigate API key.",
"support": "partner",
"currentVersion": "2.2.3",
"currentVersion": "2.2.4",
"author": "DomainTools",
"url": "https://www.domaintools.com/support/",
"email": "[email protected]",
"created": "2020-04-14T00:00:00Z",
"categories": [
"Data Enrichment & Threat Intelligence"
],
"categories": ["Data Enrichment & Threat Intelligence"],
"tags": [],
"useCases": [],
"keywords": [],
"certification": "certified",
"marketplaces": [
"xsoar",
"marketplacev2",
"platform"
],
"supportedModules": [
"X1",
"X3",
"X5",
"ENT_PLUS",
"agentix"
]
}
"marketplaces": ["xsoar", "marketplacev2", "platform"],
"supportedModules": ["X1", "X3", "X5", "ENT_PLUS", "agentix"]
}
Loading