From fdefd301510097fda4437e871d81160283b5c254 Mon Sep 17 00:00:00 2001 From: Adon Metcalfe Date: Tue, 11 Oct 2022 14:52:51 +0800 Subject: [PATCH] added upload to LA and login refresh every hour --- Dockerfile | 6 +-- main.py | 113 +++++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 105 insertions(+), 14 deletions(-) diff --git a/Dockerfile b/Dockerfile index 27f06fb..43f28ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,9 +8,9 @@ ENV AzureWebJobsScriptRoot=/home/site/wwwroot \ WORKDIR /home/site/wwwroot COPY requirements.txt requirements.txt -RUN pip install -r requirements.txt && \ - az extension add -n log-analytics -y && \ - az extension add -n resource-graph -y +RUN pip install -r requirements.txt +RUN az extension add -n log-analytics -y +RUN az extension add -n resource-graph -y RUN curl -L https://aka.ms/downloadazcopy-v10-linux -o /tmp/azcopy.tar.gz && \ cd /tmp && tar xf azcopy.tar.gz --strip 1 && rm azcopy.tar.gz && mv -v azcopy /usr/local/bin/azcopy diff --git a/main.py b/main.py index 8bc4a5b..72ee552 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,10 @@ import json import os import tempfile +import hmac +import base64 +import requests +import hashlib from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from string import Template @@ -14,6 +18,7 @@ from cacheout import Cache from pathvalidate import sanitize_filepath + from fastapi import FastAPI, BackgroundTasks Workspace = namedtuple("Workspace", "subscription, customerId, resourceGroup, name") @@ -28,6 +33,13 @@ raise Exception("Please set DATALAKE_BLOB_PREFIX and DATALAKE_SUBSCRIPTION env vars") email_footer = os.environ.get("FOOTER_HTML", "Set FOOTER_HTML env var to configure this...") +la_customer_id = os.environ.get("LA_CUSTOMERID") +la_shared_key = os.environ.get("LA_SHAREDKEY") + +app_state = { + "logged_in": False, + "login_time": datetime.utcnow() - timedelta(days=1) # last login 1 day ago to force relogin +} app = FastAPI() api = FastAPI(title="SIEM Query Utils") @@ -37,6 +49,10 @@ @cache.memoize(ttl=60) def azcli(cmd: list): "Run a general azure cli cmd" + if datetime.utcnow() - app_state["login_time"] > timedelta(hours=1): + login(refresh=True) + elif not app_state["logged_in"]: + login() cmd = ["az"] + cmd + ["--only-show-errors", "-o", "json"] try: result = check_output(cmd) @@ -69,20 +85,38 @@ def generatesas(account=datalake_account, container=datalake_container, subscrip ) -if os.environ.get("IDENTITY_HEADER"): - # Use managed service identity to login - try: - azcli(["login", "--identity"]) - except Exception as e: - # bail as we aren't able to login - print(e) - exit() - +def login(refresh: bool = False): + if os.environ.get("IDENTITY_HEADER"): + if refresh: + check_output(["az", "logout", "--only-show-errors", "-o", "json"]) + # Use managed service identity to login + try: + check_output(["az", "login", "--identity", "--only-show-errors", "-o", "json"]) + app_state["logged_in"] = True + app_state["login_time"] = datetime.utcnow() + except Exception as e: + # bail as we aren't able to login + print(e) + exit() + else: + try: + check_output(["az", "account", "show"]) + app_state["logged_in"] = True + app_state["login_time"] = datetime.utcnow() + except Exception as e: + # bail as we aren't able to login + print(e) + exit() def loadkql(query): "If query starts with kql/ then load it from a local file and return text" if query.startswith("kql/"): - query = open(sanitize_filepath(query)).read().encode("utf-8").strip() + query = open(sanitize_filepath(query)).read().strip() + elif query.startswith("kql://"): + base_url = os.environ["KQL_BASEURL"] + path = sanitize_filepath(query.replace("kql://", "")) + url = f"{base_url}/{path}" + query = requests.get(url).text.strip() return query @@ -172,16 +206,23 @@ def upload_results(results, blobdest, filenamekeys): @api.get("/globalQuery") -def global_query(query: str, tasks: BackgroundTasks, timespan: str = "P7D", count: bool = False, blobdest: str = "", filenamekeys: str = ""): +def global_query( + query: str, tasks: BackgroundTasks, timespan: str = "P7D", count: bool = False, blobdest: str = "", loganalyticsdest: str = "", filenamekeys: str = "" +): """ Query all workspaces with SecurityIncident tables using kusto. If blobdest is provided as a path the first 2 segments are assumed to be the location to save results to //.../ + If loganalyticsdest is provided it defines a custom log table to upload results to using the LA_CUSTOMERID and LA_CUSTOMERKEY env vars Results are saved as individual .json files, and overwritten if they already exist. Filenamekeys are a comma separated list of keys to build filename from """ + if loganalyticsdest: + assert la_customer_id and la_shared_key # die if no env vars set results = analytics_query([ws.customerId for ws in list_workspaces()], query, timespan) if blobdest != "": tasks.add_task(upload_results, results, blobdest, filenamekeys) + if loganalyticsdest != "": + upload_loganalytics(results, loganalyticsdest) if count: return len(results) else: @@ -390,6 +431,56 @@ def __missing__(self, key): return response +# Build the API signature +def build_la_signature(customer_id, shared_key, date, content_length, method, content_type, resource): + x_headers = "x-ms-date:" + date + string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource + bytes_to_hash = bytes(string_to_hash, encoding="utf-8") + decoded_key = base64.b64decode(shared_key) + encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode() + authorization = "SharedKey {}:{}".format(customer_id, encoded_hash) + return authorization + + +# Build and send a request to the Log Analytics POST API +def upload_loganalytics(rows: list, log_type: str): + assert la_customer_id and la_shared_key # die if no env vars set + table_name = f"{log_type}_CL" + existing_data = analytics_query([la_customer_id], table_name, "P1D") # Scan past 24 hours for duplicates + existing_hashes = set() + digest_column = "_row_sha256" + for row in existing_data: + if isinstance(row, dict): + hashkeys = [k for k in row.keys() if k.startswith(digest_column)] + if len(hashkeys) == 1: + existing_hashes.add(row[hashkeys[0]]) # collect hashes of existing data + for item in rows: + for key in ["TenantId", "tenant", "TimeGenerated", "RawData"]: # rename reserved columns + if key in item.keys(): + item[key + "_orig"] = item.pop(key) + digest = hashlib.sha256(json.dumps(item, sort_keys=True).encode("utf8")).hexdigest() + if digest not in existing_hashes: + item[digest_column] = digest # only add digest for new rows + body = json.dumps([item for item in rows if digest_column in item.keys()]) # dump new rows ready for upload + method = "POST" + content_type = "application/json" + resource = "/api/logs" + + + rfc1123date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT") + content_length = len(body) + signature = build_la_signature(la_customer_id, la_shared_key, rfc1123date, content_length, method, content_type, resource) + uri = "https://" + la_customer_id + ".ods.opinsights.azure.com" + resource + "?api-version=2016-04-01" + + headers = {"content-type": content_type, "Authorization": signature, "Log-Type": log_type, "x-ms-date": rfc1123date} + + response = requests.post(uri, data=body, headers=headers) + if response.status_code >= 200 and response.status_code <= 299: + print("Accepted") + else: + print("Response code: {}".format(response.status_code)) + + def debug_server(): "Run a debug server on port 8000 that doesn't need auth" import uvicorn