Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,5 @@
],
"sla": 0,
"threshold": 72,
"fromVersion": "6.6.0"
"fromVersion": "5.5.0"
}
218 changes: 123 additions & 95 deletions Packs/DomainTools_Iris/Integrations/DomainTools_Iris/DomainTools_Iris.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import statistics
from datetime import datetime
from typing import Any
from collections.abc import Callable
from domaintools import API
from domaintools import utils
import urllib.parse
Expand Down Expand Up @@ -83,7 +84,20 @@
''' HELPER FUNCTIONS '''


def http_request(method, params=None):
def get_client(proxy_url: Optional[str] = None):
return API(
USERNAME,
API_KEY,
app_partner='cortex_xsoar',
app_name='iris-plugin',
app_version='2.1',
proxy_url=proxy_url,
verify_ssl=VERIFY_CERT,
always_sign_api_key=True
)


def http_request(method: str, params: dict = {}):
"""
HTTP request helper function
Args:
Expand All @@ -97,17 +111,8 @@ def http_request(method, params=None):
proxy_url = PROXIES.get('https') if PROXIES.get('https') != '' else PROXIES.get('http')
if not (USERNAME and API_KEY):
raise DemistoException("The 'API Username' and 'API Key' parameters are required.")
api = API(
USERNAME,
API_KEY,
app_partner='cortex_xsoar',
app_name='iris-plugin',
app_version='2.0',
proxy_url=proxy_url,
verify_ssl=VERIFY_CERT,
always_sign_api_key=True
)

api = get_client(proxy_url=proxy_url)
try:
if method == 'iris-investigate':
response = api.iris_investigate(params.get('domains')).response()
Expand All @@ -121,6 +126,8 @@ def http_request(method, params=None):
response = api.reverse_whois(**params).response()
elif method == 'parsed-whois':
response = api.parsed_whois(params.get('domain')).response()
elif method == "parsed-domain-rdap":
response = api.parsed_domain_rdap(query=params.get("domain"))
else:
response = api.iris_investigate(**params).response()
except Exception as e:
Expand Down Expand Up @@ -476,6 +483,15 @@ def parsed_whois(domain):
return http_request('parsed-whois', {'domain': domain})


def parsed_domain_rdap(domain: str):
resp = http_request("parsed-domain-rdap", params={"domain": domain})

return {
"_raw": resp.response(),
"flat": resp.flattened()
}


def add_key_to_json(cur, to_add):
if not cur:
return to_add
Expand Down Expand Up @@ -543,7 +559,7 @@ def format_enrich_output(result):
}

demisto_title = f'DomainTools Iris Enrich for {domain}.'
iris_title = 'Investigate [{0}](https://research.domaintools.com/iris/search/?q={0}) in Iris.'.format(domain)
iris_title = f'Investigate [{domain}](https://research.domaintools.com/iris/search/?q={domain}) in Iris.'
human_readable = tableToMarkdown(
f'{demisto_title} {iris_title}', human_readable_data, headers=PROFILE_HEADERS
)
Expand Down Expand Up @@ -1005,6 +1021,31 @@ def process_domains(
demisto.incidents(incidents)


def to_camel_case(value):
result = f' {value.strip()}'
result = re.sub(r' ([a-z,A-Z])', lambda g: g.group(1).upper(), result)
return result


def create_history_table(data, headers):
table = []
for row in data:
entry = {header: row.get(header) for header in headers}
table.append(entry)

return table


def change_keys(conv, obj):
output = {}
for key, value in obj.items():
if isinstance(value, dict):
output[conv(key)] = change_keys(conv, value)
else:
output[conv(key)] = value
return output


''' COMMANDS '''


Expand Down Expand Up @@ -1046,7 +1087,7 @@ def domain_command():
if not command_results_list:
return_warning("No results.", exit=True)

return_results(command_results_list)
return command_results_list


def domain_enrich_command():
Expand Down Expand Up @@ -1087,7 +1128,7 @@ def domain_enrich_command():
if not command_results_list:
return_warning("No results.", exit=True)

return_results(command_results_list)
return command_results_list


def domain_analytics_command():
Expand Down Expand Up @@ -1138,16 +1179,14 @@ def domain_analytics_command():
headers=headers,
)

return_results(
CommandResults(
outputs_prefix='DomainTools',
outputs_key_field='Name',
outputs=domaintools_context,
indicator=domain_indicator,
readable_output=human_readable,
raw_response=domain_result,
ignore_auto_extract=True,
)
return CommandResults(
outputs_prefix='DomainTools',
outputs_key_field='Name',
outputs=domaintools_context,
indicator=domain_indicator,
readable_output=human_readable,
raw_response=domain_result,
ignore_auto_extract=True,
)


Expand Down Expand Up @@ -1224,16 +1263,14 @@ def threat_profile_command():
human_readable_data,
headers=headers)

return_results(
CommandResults(
outputs_prefix='DomainTools',
outputs_key_field='Name',
outputs=domaintools_context,
indicator=domain_indicator,
readable_output=human_readable,
raw_response=domain_result,
ignore_auto_extract=True,
)
return CommandResults(
outputs_prefix='DomainTools',
outputs_key_field='Name',
outputs=domaintools_context,
indicator=domain_indicator,
readable_output=human_readable,
raw_response=domain_result,
ignore_auto_extract=True,
)


Expand Down Expand Up @@ -1331,20 +1368,13 @@ def domain_pivot_command():
sorted_output,
headers=headers)

results = CommandResults(
return CommandResults(
outputs_prefix='DomainTools.Pivots',
outputs_key_field='Value',
outputs=pivot_result,
readable_output=human_readable,
ignore_auto_extract=True
)
return_results(results)


def to_camel_case(value):
result = f' {value.strip()}'
result = re.sub(r' ([a-z,A-Z])', lambda g: g.group(1).upper(), result)
return result


def whois_history_command():
Expand Down Expand Up @@ -1393,23 +1423,13 @@ def whois_history_command():
if mode == 'check_existence':
human_readable = f'has history entries: {response.get("has_history_entries")}'

results = CommandResults(
return CommandResults(
outputs_prefix='DomainTools.History',
outputs_key_field='Value',
outputs=history_result,
readable_output=human_readable,
ignore_auto_extract=True
)
return_results(results)


def create_history_table(data, headers):
table = []
for row in data:
entry = {header: row.get(header) for header in headers}
table.append(entry)

return table


def hosting_history_command():
Expand Down Expand Up @@ -1451,14 +1471,13 @@ def hosting_history_command():
human_readable_all = human_readable_registrar + human_readable_ns + human_readable_ip
history_result = {'Value': domain, 'IPHistory': ip_table, 'NameserverHistory': ns_table, 'RegistrarHistory': registrar_table}

results = CommandResults(
return CommandResults(
outputs_prefix='DomainTools.History',
outputs_key_field='Value',
outputs=history_result,
readable_output=human_readable_all,
ignore_auto_extract=True
)
return_results(results)


def reverse_whois_command():
Expand All @@ -1484,24 +1503,14 @@ def reverse_whois_command():
context.append({'Name': domain})

all_context = {'Value': terms, 'Results': context}
results = CommandResults(

return CommandResults(
outputs_prefix='DomainTools.ReverseWhois',
outputs_key_field='Value',
outputs=all_context,
readable_output=human_readable,
ignore_auto_extract=True
)
return_results(results)


def change_keys(conv, obj):
output = {}
for key, value in obj.items():
if isinstance(value, dict):
output[conv(key)] = change_keys(conv, value)
else:
output[conv(key)] = value
return output


def parsed_whois_command():
Expand Down Expand Up @@ -1548,23 +1557,22 @@ def parsed_whois_command():
whois_record_date=parsed.get('updated_date'))]
)

results = CommandResults(
return CommandResults(
indicator=domain_indicator,
readable_output=human_readable,
raw_response=parsed,
ignore_auto_extract=True
)

return_results(results)


def test_module():
"""
Tests the API key for a user.
"""

http_request('iris-investigate', {'domains': 'demisto.com'})
demisto.results('ok')

return "ok"


def fetch_domains():
Expand All @@ -1590,38 +1598,58 @@ def fetch_domains():
)


def parsed_domain_rdap_command():
"""
Returns parsed domain rdap data in a given domain
"""
domain = demisto.args()['domain']
results = parsed_domain_rdap(domain=domain)

_raw_response = results.get("_raw") or {}
flat_response = results.get("flat") or {}

headers = list(flat_response.keys())

human_readable = tableToMarkdown(f'DomainTools parsed domain rdap result for {domain}', flat_response, headers=headers)

return CommandResults(
readable_output=human_readable,
raw_response=_raw_response,
ignore_auto_extract=True
)


def main():
"""
Main Demisto function.
"""
command_mapping: dict[str, Callable] = {
"test-module": test_module,
"domain": domain_command,
"domainProfile": parsed_domain_rdap_command,
"domaintoolsiris-investigate": domain_command,
"domaintoolsiris-analytics": domain_analytics_command,
"domaintoolsiris-threat-profile": threat_profile_command,
"domaintoolsiris-pivot": domain_pivot_command,
"domaintoolsiris-enrich": domain_enrich_command,
"domaintools-whois-history": whois_history_command,
"domaintools-hosting-history": hosting_history_command,
"domaintools-reverse-whois": reverse_whois_command,
"domaintools-whois": parsed_whois_command,
}

try:
if demisto.command() == 'test-module':
test_module()
elif demisto.command() == 'domain':
domain_command()
elif demisto.command() == 'domaintoolsiris-investigate':
domain_command()
elif demisto.command() == 'domaintoolsiris-analytics':
domain_analytics_command()
elif demisto.command() == 'domaintoolsiris-threat-profile':
threat_profile_command()
elif demisto.command() == 'domaintoolsiris-pivot':
domain_pivot_command()
elif demisto.command() == 'domaintoolsiris-enrich':
domain_enrich_command()
elif demisto.command() == 'domaintools-whois-history':
whois_history_command()
elif demisto.command() == 'domaintools-hosting-history':
hosting_history_command()
elif demisto.command() == 'domaintools-reverse-whois':
reverse_whois_command()
elif demisto.command() == 'domaintools-whois':
parsed_whois_command()
elif demisto.command() == 'fetch-incidents':
command = demisto.command()
demisto.debug(f"Command being called is {command}")
if command in command_mapping:
return_results(command_mapping[command]())
elif command == 'fetch-incidents':
fetch_domains()
else:
raise NotImplementedError(f"Command {command} is not supported.")
except Exception as e:
return_error(
f'Unable to perform command : {demisto.command()}, Reason: {str(e)}'
f'Unable to perform command : {command}, Reason: {str(e)}'
)


Expand Down
Loading
Loading