Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
added upload to LA and login refresh every hour
Browse files Browse the repository at this point in the history
  • Loading branch information
adonm committed Oct 11, 2022
1 parent aeab443 commit fdefd30
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 14 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ ENV AzureWebJobsScriptRoot=/home/site/wwwroot \
WORKDIR /home/site/wwwroot

COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt && \
az extension add -n log-analytics -y && \
az extension add -n resource-graph -y
RUN pip install -r requirements.txt
RUN az extension add -n log-analytics -y
RUN az extension add -n resource-graph -y
RUN curl -L https://aka.ms/downloadazcopy-v10-linux -o /tmp/azcopy.tar.gz && \
cd /tmp && tar xf azcopy.tar.gz --strip 1 && rm azcopy.tar.gz && mv -v azcopy /usr/local/bin/azcopy

Expand Down
113 changes: 102 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import json
import os
import tempfile
import hmac
import base64
import requests
import hashlib
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from string import Template
Expand All @@ -14,6 +18,7 @@
from cacheout import Cache
from pathvalidate import sanitize_filepath


from fastapi import FastAPI, BackgroundTasks

Workspace = namedtuple("Workspace", "subscription, customerId, resourceGroup, name")
Expand All @@ -28,6 +33,13 @@
raise Exception("Please set DATALAKE_BLOB_PREFIX and DATALAKE_SUBSCRIPTION env vars")

email_footer = os.environ.get("FOOTER_HTML", "Set FOOTER_HTML env var to configure this...")
la_customer_id = os.environ.get("LA_CUSTOMERID")
la_shared_key = os.environ.get("LA_SHAREDKEY")

app_state = {
"logged_in": False,
"login_time": datetime.utcnow() - timedelta(days=1) # last login 1 day ago to force relogin
}

app = FastAPI()
api = FastAPI(title="SIEM Query Utils")
Expand All @@ -37,6 +49,10 @@
@cache.memoize(ttl=60)
def azcli(cmd: list):
"Run a general azure cli cmd"
if datetime.utcnow() - app_state["login_time"] > timedelta(hours=1):
login(refresh=True)
elif not app_state["logged_in"]:
login()
cmd = ["az"] + cmd + ["--only-show-errors", "-o", "json"]
try:
result = check_output(cmd)
Expand Down Expand Up @@ -69,20 +85,38 @@ def generatesas(account=datalake_account, container=datalake_container, subscrip
)


if os.environ.get("IDENTITY_HEADER"):
# Use managed service identity to login
try:
azcli(["login", "--identity"])
except Exception as e:
# bail as we aren't able to login
print(e)
exit()

def login(refresh: bool = False):
if os.environ.get("IDENTITY_HEADER"):
if refresh:
check_output(["az", "logout", "--only-show-errors", "-o", "json"])
# Use managed service identity to login
try:
check_output(["az", "login", "--identity", "--only-show-errors", "-o", "json"])
app_state["logged_in"] = True
app_state["login_time"] = datetime.utcnow()
except Exception as e:
# bail as we aren't able to login
print(e)
exit()
else:
try:
check_output(["az", "account", "show"])
app_state["logged_in"] = True
app_state["login_time"] = datetime.utcnow()
except Exception as e:
# bail as we aren't able to login
print(e)
exit()

def loadkql(query):
"If query starts with kql/ then load it from a local file and return text"
if query.startswith("kql/"):
query = open(sanitize_filepath(query)).read().encode("utf-8").strip()
query = open(sanitize_filepath(query)).read().strip()
elif query.startswith("kql://"):
base_url = os.environ["KQL_BASEURL"]
path = sanitize_filepath(query.replace("kql://", ""))
url = f"{base_url}/{path}"
query = requests.get(url).text.strip()
return query


Expand Down Expand Up @@ -172,16 +206,23 @@ def upload_results(results, blobdest, filenamekeys):


@api.get("/globalQuery")
def global_query(query: str, tasks: BackgroundTasks, timespan: str = "P7D", count: bool = False, blobdest: str = "", filenamekeys: str = ""):
def global_query(
query: str, tasks: BackgroundTasks, timespan: str = "P7D", count: bool = False, blobdest: str = "", loganalyticsdest: str = "", filenamekeys: str = ""
):
"""
Query all workspaces with SecurityIncident tables using kusto.
If blobdest is provided as a path the first 2 segments are assumed to be the location to save results to <account>/<container>/.../<filename>
If loganalyticsdest is provided it defines a custom log table to upload results to using the LA_CUSTOMERID and LA_CUSTOMERKEY env vars
Results are saved as individual .json files, and overwritten if they already exist.
Filenamekeys are a comma separated list of keys to build filename from
"""
if loganalyticsdest:
assert la_customer_id and la_shared_key # die if no env vars set
results = analytics_query([ws.customerId for ws in list_workspaces()], query, timespan)
if blobdest != "":
tasks.add_task(upload_results, results, blobdest, filenamekeys)
if loganalyticsdest != "":
upload_loganalytics(results, loganalyticsdest)
if count:
return len(results)
else:
Expand Down Expand Up @@ -390,6 +431,56 @@ def __missing__(self, key):
return response


# Build the API signature
def build_la_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
x_headers = "x-ms-date:" + date
string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
authorization = "SharedKey {}:{}".format(customer_id, encoded_hash)
return authorization


# Build and send a request to the Log Analytics POST API
def upload_loganalytics(rows: list, log_type: str):
assert la_customer_id and la_shared_key # die if no env vars set
table_name = f"{log_type}_CL"
existing_data = analytics_query([la_customer_id], table_name, "P1D") # Scan past 24 hours for duplicates
existing_hashes = set()
digest_column = "_row_sha256"
for row in existing_data:
if isinstance(row, dict):
hashkeys = [k for k in row.keys() if k.startswith(digest_column)]
if len(hashkeys) == 1:
existing_hashes.add(row[hashkeys[0]]) # collect hashes of existing data
for item in rows:
for key in ["TenantId", "tenant", "TimeGenerated", "RawData"]: # rename reserved columns
if key in item.keys():
item[key + "_orig"] = item.pop(key)
digest = hashlib.sha256(json.dumps(item, sort_keys=True).encode("utf8")).hexdigest()
if digest not in existing_hashes:
item[digest_column] = digest # only add digest for new rows
body = json.dumps([item for item in rows if digest_column in item.keys()]) # dump new rows ready for upload
method = "POST"
content_type = "application/json"
resource = "/api/logs"


rfc1123date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
content_length = len(body)
signature = build_la_signature(la_customer_id, la_shared_key, rfc1123date, content_length, method, content_type, resource)
uri = "https://" + la_customer_id + ".ods.opinsights.azure.com" + resource + "?api-version=2016-04-01"

headers = {"content-type": content_type, "Authorization": signature, "Log-Type": log_type, "x-ms-date": rfc1123date}

response = requests.post(uri, data=body, headers=headers)
if response.status_code >= 200 and response.status_code <= 299:
print("Accepted")
else:
print("Response code: {}".format(response.status_code))


def debug_server():
"Run a debug server on port 8000 that doesn't need auth"
import uvicorn
Expand Down

1 comment on commit fdefd30

@adonm
Copy link
Member Author

@adonm adonm commented on fdefd30 Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolves #3

Please sign in to comment.