Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 75 additions & 41 deletions Packs/FeedDomainTools/Integrations/FeedDomainTools/FeedDomainTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# disable insecure warnings
urllib3.disable_warnings()

RISK_THRESHOLD = 70


class DomainToolsClient(BaseClient):
"""
Expand All @@ -24,6 +26,8 @@ class DomainToolsClient(BaseClient):
NOH_FEED = "noh"
DOMAINRDAP = "domainrdap"
DOMAINDISCOVERY = "domaindiscovery"
DOMAINRISK = "domainrisk"
DOMAINHOTLIST = "domainhotlist"

FEED_URL = "/v1/feed"
DOMAINTOOLS_API_BASE_URL = "https://api.domaintools.com"
Expand All @@ -35,22 +39,21 @@ def __init__(
verify_ssl: bool = True,
proxy: bool = False,
tags: str = "",
tlp_color: str | None = None
tlp_color: str | None = None,
):
self.feed_type = "nod" # default to NOD feeds
self.tags = tags
self.tlp_color = tlp_color

if not (api_username and api_key):
raise DemistoException(
"The 'API Username' and 'API Key' parameters are required."
)
raise DemistoException("The 'API Username' and 'API Key' parameters are required.")

self.api_username = api_username
self.api_key = api_key

super().__init__(base_url=self.DOMAINTOOLS_API_BASE_URL, headers={
"Content-Type": "application/json"}, verify=verify_ssl, proxy=proxy)
super().__init__(
base_url=self.DOMAINTOOLS_API_BASE_URL, headers={"Content-Type": "application/json"}, verify=verify_ssl, proxy=proxy
)

def _get_dt_feeds(
self,
Expand All @@ -74,12 +77,11 @@ def _get_dt_feeds(
"domain": domain,
}

demisto.info(
f"Fetching DomainTools {feed_type_name} feed type with params: {query_params}"
)
demisto.info(f"Fetching DomainTools {feed_type_name} feed type with params: {query_params}")

response = self._http_request("GET", url_suffix=f"{self.FEED_URL}/{self.feed_type}/",
params=query_params, resp_type="text", raise_on_status=True)
response = self._http_request(
"GET", url_suffix=f"{self.FEED_URL}/{self.feed_type}/", params=query_params, resp_type="text", raise_on_status=True
)

results = response.strip().split("\n") if response else []
return results
Expand All @@ -99,9 +101,7 @@ def _format_parameter(self, key: str, value: Any) -> Any:

return value

def build_iterator(
self, feed_type: str = "nod", dt_feed_kwargs: dict = {}
) -> Iterator:
def build_iterator(self, feed_type: str = "nod", dt_feed_kwargs: dict = {}) -> Iterator:
"""
Retrieves all entries from the feed.

Expand All @@ -117,7 +117,7 @@ def build_iterator(

# DomainTools feeds optional arguments
session_id = dt_feed_kwargs.get("session_id", "dt-cortex-feeds")
top = int(dt_feed_kwargs.get("top", "5000"))
top = int(dt_feed_kwargs.get("top") or "5000")
domain = dt_feed_kwargs.get("domain")
after = dt_feed_kwargs.get("after")
before = dt_feed_kwargs.get("before")
Expand Down Expand Up @@ -156,10 +156,11 @@ def build_iterator(

timestamp = json_feed.get("timestamp", "")
indicator = json_feed.get("domain")
indicator_type = auto_detect_indicator_type(indicator)
indicator_type = FeedIndicatorType.Domain

# for `domainrdap` feed, we have more data to display including the parsed data.
parsed_record = json_feed.get("parsed_record", {})
overall_risk_score = json_feed.get("overall_risk", None)

if indicator and indicator_type:
yield {
Expand All @@ -168,7 +169,8 @@ def build_iterator(
"timestamp": timestamp,
"tags": ["DomainToolsFeeds", self.feed_type] + ud_tags,
"tlp_color": self.tlp_color,
"parsed_record": parsed_record
"parsed_record": parsed_record,
"overall_risk_score": overall_risk_score,
}

limit_counter += 1
Expand All @@ -177,9 +179,37 @@ def build_iterator(
demisto.info(f"Done processing {processed_feeds} out of {total_dt_feeds} {self.feed_type} feeds.")
except Exception as err:
demisto.debug(str(err))
raise ValueError(
f"Could not parse returned data as indicator. \n\nError massage: {str(err)}"
)
raise ValueError(f"Could not parse returned data as indicator. \n\nError massage: {str(err)}")


def get_dbot_score(overall_risk_score: int | None = None):
"""
Gets the DBot score
score info:
NONE = 0
GOOD = 1
SUSPICIOUS = 2
BAD = 3
Args:
overall_risk_score: The overall riskscore. Defaults to None.

Returns: DBot Score

"""
# Unknown scores
if overall_risk_score is None:
return Common.DBotScore.NONE

# check for the 'BAD' condition then return.
if overall_risk_score >= RISK_THRESHOLD:
return Common.DBotScore.BAD

# check for 'SUSPICIOUS' conditions as we know both scores will be lower.
if 50 <= overall_risk_score <= 69:
return Common.DBotScore.SUSPICIOUS

# If the domain is not BAD and not SUSPICIOUS, then return GOOD.
return Common.DBotScore.GOOD


def batch_create_indicators(indicators: list[dict[str, Any]], batch_size: int = 2000):
Expand All @@ -193,9 +223,7 @@ def batch_create_indicators(indicators: list[dict[str, Any]], batch_size: int =
demisto.createIndicators(iter_)


def fetch_indicators(
client: DomainToolsClient, feed_type: str = "nod", dt_feed_kwargs: dict[str, Any] = {}
) -> list[dict]:
def fetch_indicators(client: DomainToolsClient, feed_type: str = "nod", dt_feed_kwargs: dict[str, Any] = {}) -> list[dict]:
"""Retrieves indicators from the feed

Args:
Expand All @@ -215,6 +243,7 @@ def fetch_indicators(
tags_ = item.get("tags", [])
tlp_color_ = item.get("tlp_color")
parsed_record_ = item.get("parsed_record")
overall_risk_score_ = item.get("overall_risk_score")

indicator_tags = ",".join(tags_).rstrip(",")

Expand All @@ -235,14 +264,17 @@ def fetch_indicators(
"tags": indicator_tags,
"service": "DomainTools Feeds",
"firstseenbysource": timestamp_,
"sourcebrands": "FeedDomainTools"
"sourcebrands": "FeedDomainTools",
},
"rawJSON": raw_data,
}

if tlp_color_:
indicator_obj["fields"]["trafficlightprotocol"] = tlp_color_

if overall_risk_score_:
indicator_obj["score"] = get_dbot_score(overall_risk_score=overall_risk_score_)

indicators.append(indicator_obj)

if idx % 1000 == 0 or (idx < 1000 and idx % 100 == 0):
Expand Down Expand Up @@ -277,9 +309,7 @@ def get_indicators_command(client: DomainToolsClient, args: dict[str, str], para
}

demisto.debug(f"Fetching feed indicators by feed_type: {feed_type}")
indicators = fetch_indicators(
client, feed_type=feed_type, dt_feed_kwargs=dt_feeds_kwargs
)
indicators = fetch_indicators(client, feed_type=feed_type, dt_feed_kwargs=dt_feeds_kwargs)

human_readable = tableToMarkdown(
f"Indicators from DomainTools {feed_type.upper()} Feed:",
Expand All @@ -290,11 +320,7 @@ def get_indicators_command(client: DomainToolsClient, args: dict[str, str], para

batch_create_indicators(indicators, batch_size=100)

return CommandResults(
readable_output=human_readable,
raw_response=indicators,
ignore_auto_extract=True
)
return CommandResults(readable_output=human_readable, raw_response=indicators, ignore_auto_extract=True)


def fetch_indicators_command(client: DomainToolsClient, params: dict[str, Any] = {}) -> list[dict]:
Expand All @@ -318,7 +344,9 @@ def fetch_indicators_command(client: DomainToolsClient, params: dict[str, Any] =
client.NAD_FEED,
client.NOH_FEED,
client.DOMAINRDAP,
client.DOMAINDISCOVERY
client.DOMAINDISCOVERY,
client.DOMAINRISK,
client.DOMAINHOTLIST,
]

dt_feed_kwargs = {"top": top, "after": after, "session_id": session_id}
Expand All @@ -333,6 +361,7 @@ def fetch_indicators_command(client: DomainToolsClient, params: dict[str, Any] =
indicators = fetch_indicators(client, feed_type=feed_type, dt_feed_kwargs=dt_feed_kwargs)

fetched_indicators.extend(indicators)

return fetched_indicators


Expand All @@ -343,12 +372,11 @@ def test_module(client: DomainToolsClient, args: dict[str, str], params: dict[st
Returns:
str.
"""
dt_feed_kwargs = {
"top": 1,
"after": None
}
dt_feed_kwargs = {"top": 1, "after": None}

feed_type_ = params.get("feed_type", "NOD")
try:
next(client.build_iterator(dt_feed_kwargs=dt_feed_kwargs))
next(client.build_iterator(feed_type=feed_type_, dt_feed_kwargs=dt_feed_kwargs))
except Exception as e:
raise Exception(
"Could not fetch DomainTools Feed\n"
Expand All @@ -371,13 +399,19 @@ def main():
api_username = params.get("credentials", {}).get("identifier", "")
api_key = params.get("credentials", {}).get("password", "")
insecure = not params.get("insecure", False)
proxy = params.get('proxy', False)
proxy = params.get("proxy", False)
user_defined_tags = params.get("feedTags", "")
tlp_color = params.get("tlp_color")

try:
client = DomainToolsClient(api_username=api_username, api_key=api_key, verify_ssl=insecure,
proxy=proxy, tags=user_defined_tags, tlp_color=tlp_color)
client = DomainToolsClient(
api_username=api_username,
api_key=api_key,
verify_ssl=insecure,
proxy=proxy,
tags=user_defined_tags,
tlp_color=tlp_color,
)

demisto.debug(f"Command being called is {command}")
if command in commands:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ configuration:
- noh
- domainrdap
- domaindiscovery
- domainrisk
- domainhotlist
additionalinfo: The DomainTools feed type fo fetch. Defaults to 'ALL'.
section: Collect
- display: Fetch indicators
Expand Down Expand Up @@ -195,7 +197,7 @@ script:
default: false
required: false
secret: false
dockerimage: demisto/vendors-sdk:1.0.0.3242986
dockerimage: demisto/vendors-sdk:1.0.0.4577311
feed: true
isfetch: false
longRunning: false
Expand Down
Loading
Loading