Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Solutions/ZeroFox/Data Connectors/Alerts/.fincignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.venv
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import os
from datetime import datetime, timezone, timedelta

import azure.functions as func
from connections.sentinel import SentinelConnector
from connections.zerofox import ZeroFoxClient
from dateutil import parser


async def main(mytimer: func.TimerRequest) -> None:
now = datetime.now(timezone.utc)
utc_timestamp = now.isoformat()

if mytimer.past_due:
logging.info("The timer is past due!")

date_format = "%Y-%m-%dT%H:%M:%SZ"

# Environment variables for Logs Ingestion API
dce_endpoint = os.environ.get("DCE_ENDPOINT")
dcr_immutable_id = os.environ.get("DCR_IMMUTABLE_ID")
stream_name = os.environ.get("STREAM_NAME", "Custom-ZeroFoxAlertPoller_CL")

days_ago = os.environ.get("SinceDaysAgo", "0")

if is_first_run(mytimer):
query_from = (now - timedelta(days=float(days_ago))).strftime(date_format)
else:
query_from = min(
parse_last_update(mytimer), now
).strftime(date_format)
query_to = now.strftime(date_format)

logging.info(f"Querying ZeroFox alerts from {query_from} to {query_to}")

zerofox = get_zf_client()

log_type = "ZeroFoxAlertPoller"

sentinel = SentinelConnector(
dce_endpoint=dce_endpoint,
dcr_immutable_id=dcr_immutable_id,
stream_name=stream_name,
)

async with sentinel:
batches = zerofox.get_alerts(
last_modified_min_date=query_from,
last_modified_max_date=query_to,
)
for batch in batches:
await sentinel.send(batch)

if sentinel.failed_sent_events_number:
logging.error(f"Failed to send {sentinel.failed_sent_events_number} events")

logging.info(
f"Connector {log_type} ran at {utc_timestamp}, "
f"sending {sentinel.successfull_sent_events_number} events to Sentinel."
)


def is_first_run(mytimer: func.TimerRequest) -> bool:
last_run = mytimer.schedule_status.get("Last")
return last_run == "0001-01-01T00:00:00+00:00"

def parse_last_update(mytimer: func.TimerRequest) -> datetime:
try:
last = mytimer.schedule_status.get("Last")
if last:
return parser.parse(last)
except (AttributeError, KeyError, TypeError):
pass
return datetime.now(timezone.utc)


def get_zf_client() -> ZeroFoxClient:
"""Create a ZeroFox client from environment variables."""
api_token = os.environ.get("ZeroFoxApiToken")
return ZeroFoxClient(api_token)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */5 * * * *"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import json

import requests


class ApiResponseException(Exception):
"""Represents event where response status code was not as expected."""

def __init__(self, method, url, res: requests.Response):
"""Construct exception based on endpoint address and response."""
err_msg = f"Error in {method} API call to endpoint {url}\n\
[{res.status_code}] - {res.reason}"
try:
# Try to parse json error response
error_entry = res.json()
err_msg += f"\n{json.dumps(error_entry)}"
except ValueError:
err_msg += f"\n{res.text}"
super().__init__(err_msg)




Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import asyncio
import json
import logging
from collections import deque
from typing import List, Dict, Any

from azure.identity import DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient
from azure.core.exceptions import HttpResponseError


class SentinelConnector:
"""
Connector for the Azure Monitor Logs Ingestion API.

This replaces the deprecated HTTP Data Collector API with the new
Logs Ingestion API that uses Microsoft Entra ID authentication.

Requires:
- Data Collection Endpoint (DCE) URI
- Data Collection Rule (DCR) Immutable ID
- Stream name (usually "Custom-{TableName}")
- Managed Identity or Azure AD credentials
"""

def __init__(
self,
dce_endpoint: str,
dcr_immutable_id: str,
stream_name: str,
queue_size: int = 2000,
queue_size_bytes: int = 25 * (2**20),
):
"""
Initialize the Sentinel connector.

Args:
dce_endpoint: Data Collection Endpoint URI
(e.g., https://dce-xxx.eastus-1.ingest.monitor.azure.com)
dcr_immutable_id: Data Collection Rule Immutable ID
(e.g., dcr-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx)
stream_name: Stream name defined in the DCR
(e.g., Custom-ZeroFoxAlertPoller_CL)
queue_size: Number of events to buffer before flushing
queue_size_bytes: Maximum size in bytes before splitting requests
"""
self.dce_endpoint = dce_endpoint
self.dcr_immutable_id = dcr_immutable_id
self.stream_name = stream_name
self.queue_size = queue_size
self.queue_size_bytes = queue_size_bytes
self._queue = deque()
self.lock = asyncio.Lock()
self.successfull_sent_events_number = 0
self.failed_sent_events_number = 0

# Use DefaultAzureCredential which supports:
# - Managed Identity (in Azure Functions)
# - Environment variables (AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
# - Azure CLI credentials (for local development)
self.credential = DefaultAzureCredential()
self.client = LogsIngestionClient(
endpoint=dce_endpoint,
credential=self.credential
)

async def send(self, batch: List[Dict[str, Any]]):
"""
Add batch to queue and flush if queue is full.

Args:
batch: List of log records to send
"""
self._queue.extend(batch)
if len(self._queue) >= self.queue_size:
await self.flush()
self._queue.clear()

async def flush(self):
"""Flush all queued events to Azure Monitor."""
await self._flush(list(self._queue))
self._queue.clear()

async def _flush(self, data: List[Dict[str, Any]]):
"""
Send data to Azure Monitor Logs Ingestion API.

Args:
data: List of log records to send
"""
if not data:
return

split_data = self._split_big_request(data)

for batch in split_data:
await self._post_data(batch)

async def _post_data(self, body: List[Dict[str, Any]]):
"""
Post data using the Logs Ingestion API.

Args:
body: List of log records to send
"""
events_number = len(body)

try:
# The SDK is synchronous, wrap in executor for async compatibility
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: self.client.upload(
rule_id=self.dcr_immutable_id,
stream_name=self.stream_name,
logs=body
)
)

logging.info(
f"{events_number} events have been successfully sent to Microsoft Sentinel"
)
self.successfull_sent_events_number += events_number

except HttpResponseError as e:
logging.error(
f"Error during sending events to Microsoft Sentinel. "
f"Status: {e.status_code}, Message: {e.message}"
)
self.failed_sent_events_number += events_number
except Exception as e:
logging.error(f"Unexpected error sending events: {str(e)}")
self.failed_sent_events_number += events_number

async def __aenter__(self):
"""Async context manager entry."""
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit - flush remaining events and cleanup."""
await self.flush()
self.client.close()

def _check_size(self, queue: List) -> bool:
"""
Check if queue is within size limits.

Args:
queue: List of log records to check

Returns:
True if queue size is within limits
"""
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < self.queue_size_bytes

def _split_big_request(self, queue: List) -> List[List]:
"""
Split large requests into smaller batches recursively.

Args:
queue: List of log records to split

Returns:
List of batches, each within size limits
"""
if self._check_size(queue):
return [queue]
else:
middle = len(queue) // 2
queues_list = [queue[:middle], queue[middle:]]
return (
self._split_big_request(queues_list[0]) +
self._split_big_request(queues_list[1])
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
from typing import Generator, List, Dict, Any

import requests

from connections.exceptions import ApiResponseException

TIMEOUT = 60


class ZeroFoxClient:
"""ZeroFox API client for the Alerts API (v1.0)."""

def __init__(self, api_token: str) -> None:
self.api_token = api_token
self._base_url = "https://api.zerofox.com"

def get_alerts(
self,
last_modified_min_date: str,
last_modified_max_date: str,
page_size: int = 100,
) -> Generator[List[Dict[str, Any]], None, None]:
"""
Fetch alerts from ZeroFox API with pagination.

Args:
last_modified_min_date: ISO format datetime string for min date filter
last_modified_max_date: ISO format datetime string for max date filter
page_size: Number of results per page (default 100)

Yields:
List of alert dictionaries for each page
"""
url = f"{self._base_url}/1.0/alerts/"
headers = self._get_request_headers()
offset = 0

while True:
params = {
"last_modified_min_date": last_modified_min_date,
"last_modified_max_date": last_modified_max_date,
"sort_direction": "asc",
"offset": offset,
"limit": page_size,
}

response = self._http_request(
method="GET",
url=url,
headers=headers,
params=params,
timeout=TIMEOUT,
)

alerts = response.get("alerts", [])
if not alerts:
break

yield alerts

# Check if there are more pages
if len(alerts) < page_size:
break

offset += page_size

def _http_request(
self,
method: str,
url: str,
timeout: float = TIMEOUT,
**kwargs,
) -> Dict[str, Any]:
"""Wrap request method for handling status codes."""
response = requests.request(
method=method,
url=url,
timeout=timeout,
**kwargs,
)
if response.status_code != 200:
logging.error(f"Failed to {method} {url}. Response: {response.text}")
raise ApiResponseException(method, url=url, res=response)
return response.json()

def _get_request_headers(self) -> Dict[str, str]:
"""Get headers for API requests using API Key auth."""
return {
"Authorization": f"Token {self.api_token}",
"Content-Type": "application/json",
"Accept": "application/json",
"zf-source": "Microsoft-Sentinel",
"User-Agent": "Microsoft-Sentinel-ZeroFox-Connector",
}
Loading
Loading