-
Notifications
You must be signed in to change notification settings - Fork 201
/
Copy pathmitigations.py
292 lines (238 loc) · 10.8 KB
/
mitigations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import requests
from anthropic import Anthropic
from mistralai import Mistral
from openai import OpenAI, AzureOpenAI
import streamlit as st
import google.generativeai as genai
from groq import Groq
from utils import process_groq_response, create_reasoning_system_prompt
# Function to create a prompt to generate mitigating controls
def create_mitigations_prompt(threats):
prompt = f"""
Act as a cyber security expert with more than 20 years experience of using the STRIDE threat modelling methodology. Your task is to provide potential mitigations for the threats identified in the threat model. It is very important that your responses are tailored to reflect the details of the threats.
Your output should be in the form of a markdown table with the following columns:
- Column A: Threat Type
- Column B: Scenario
- Column C: Suggested Mitigation(s)
Below is the list of identified threats:
{threats}
YOUR RESPONSE (do not wrap in a code block):
"""
return prompt
# Function to get mitigations from the GPT response.
def get_mitigations(api_key, model_name, prompt):
client = OpenAI(api_key=api_key)
# For reasoning models (o1, o3-mini), use a structured system prompt
if model_name in ["o1", "o3-mini"]:
system_prompt = create_reasoning_system_prompt(
task_description="Generate effective security mitigations for the identified threats using the STRIDE methodology.",
approach_description="""1. Analyze each threat in the provided threat model
2. For each threat:
- Understand the threat type and scenario
- Consider the potential impact
- Identify appropriate security controls and mitigations
- Ensure mitigations are specific and actionable
3. Format the output as a markdown table with columns for:
- Threat Type
- Scenario
- Suggested Mitigation(s)
4. Ensure mitigations follow security best practices and industry standards"""
)
else:
system_prompt = "You are a helpful assistant that provides threat mitigation strategies in Markdown format."
response = client.chat.completions.create(
model = model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
)
# Access the content directly as the response will be in text format
mitigations = response.choices[0].message.content
return mitigations
# Function to get mitigations from the Azure OpenAI response.
def get_mitigations_azure(azure_api_endpoint, azure_api_key, azure_api_version, azure_deployment_name, prompt):
client = AzureOpenAI(
azure_endpoint = azure_api_endpoint,
api_key = azure_api_key,
api_version = azure_api_version,
)
response = client.chat.completions.create(
model = azure_deployment_name,
messages=[
{"role": "system", "content": "You are a helpful assistant that provides threat mitigation strategies in Markdown format."},
{"role": "user", "content": prompt}
]
)
# Access the content directly as the response will be in text format
mitigations = response.choices[0].message.content
return mitigations
# Function to get mitigations from the Google model's response.
def get_mitigations_google(google_api_key, google_model, prompt):
genai.configure(api_key=google_api_key)
model = genai.GenerativeModel(
google_model,
system_instruction="You are a helpful assistant that provides threat mitigation strategies in Markdown format.",
)
response = model.generate_content(prompt)
try:
# Extract the text content from the 'candidates' attribute
mitigations = response.candidates[0].content.parts[0].text
# Replace '\n' with actual newline characters
mitigations = mitigations.replace('\\n', '\n')
except (IndexError, AttributeError) as e:
return None
return mitigations
# Function to get mitigations from the Mistral model's response.
def get_mitigations_mistral(mistral_api_key, mistral_model, prompt):
client = Mistral(api_key=mistral_api_key)
response = client.chat.complete(
model = mistral_model,
messages=[
{"role": "system", "content": "You are a helpful assistant that provides threat mitigation strategies in Markdown format."},
{"role": "user", "content": prompt}
]
)
# Access the content directly as the response will be in text format
mitigations = response.choices[0].message.content
return mitigations
# Function to get mitigations from Ollama hosted LLM.
def get_mitigations_ollama(ollama_endpoint, ollama_model, prompt):
"""
Get mitigations from Ollama hosted LLM.
Args:
ollama_endpoint (str): The URL of the Ollama endpoint (e.g., 'http://localhost:11434')
ollama_model (str): The name of the model to use
prompt (str): The prompt to send to the model
Returns:
str: The generated mitigations in markdown format
Raises:
requests.exceptions.RequestException: If there's an error communicating with the Ollama endpoint
KeyError: If the response doesn't contain the expected fields
"""
if not ollama_endpoint.endswith('/'):
ollama_endpoint = ollama_endpoint + '/'
url = ollama_endpoint + "api/chat"
data = {
"model": ollama_model,
"stream": False,
"messages": [
{
"role": "system",
"content": """You are a cyber security expert with more than 20 years experience of implementing security controls for a wide range of applications. Your task is to analyze the provided application description and suggest appropriate security controls and mitigations.
Please provide your response in markdown format with appropriate headings and bullet points."""
},
{
"role": "user",
"content": prompt
}
]
}
try:
response = requests.post(url, json=data, timeout=60) # Add timeout
response.raise_for_status() # Raise exception for bad status codes
outer_json = response.json()
try:
# Access the 'content' attribute of the 'message' dictionary
mitigations = outer_json["message"]["content"]
return mitigations
except KeyError as e:
raise
except requests.exceptions.RequestException as e:
raise
# Function to get mitigations from the Anthropic model's response.
def get_mitigations_anthropic(anthropic_api_key, anthropic_model, prompt):
client = Anthropic(api_key=anthropic_api_key)
# Check if we're using extended thinking mode
is_thinking_mode = "thinking" in anthropic_model.lower()
# If using thinking mode, use the actual model name without the "thinking" suffix
actual_model = "claude-3-7-sonnet-latest" if is_thinking_mode else anthropic_model
try:
# Configure the request based on whether thinking mode is enabled
if is_thinking_mode:
response = client.messages.create(
model=actual_model,
max_tokens=24000,
thinking={
"type": "enabled",
"budget_tokens": 16000
},
system="You are a helpful assistant that provides threat mitigation strategies in Markdown format.",
messages=[
{"role": "user", "content": prompt}
],
timeout=600 # 10-minute timeout
)
else:
response = client.messages.create(
model=actual_model,
max_tokens=4096,
system="You are a helpful assistant that provides threat mitigation strategies in Markdown format.",
messages=[
{"role": "user", "content": prompt}
],
timeout=300 # 5-minute timeout
)
# Access the text content
if is_thinking_mode:
# For thinking mode, we need to extract only the text content blocks
mitigations = ''.join(block.text for block in response.content if block.type == "text")
# Store thinking content in session state for debugging/transparency (optional)
thinking_content = ''.join(block.thinking for block in response.content if block.type == "thinking")
if thinking_content:
st.session_state['last_thinking_content'] = thinking_content
else:
# Standard handling for regular responses
mitigations = response.content[0].text
return mitigations
except Exception as e:
# Handle timeout and other errors
error_message = str(e)
st.error(f"Error with Anthropic API: {error_message}")
# Create a fallback response for timeout or other errors
fallback_mitigations = f"""
## Error Generating Mitigations
**API Error:** {error_message}
### Suggestions:
- For complex applications, try simplifying the input or breaking it into smaller components
- If you're using extended thinking mode and encountering timeouts, try the standard model instead
- Consider reducing the complexity of the application description
"""
return fallback_mitigations
# Function to get mitigations from LM Studio Server response.
def get_mitigations_lm_studio(lm_studio_endpoint, model_name, prompt):
client = OpenAI(
base_url=f"{lm_studio_endpoint}/v1",
api_key="not-needed" # LM Studio Server doesn't require an API key
)
response = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": "You are a helpful assistant that provides threat mitigation strategies in Markdown format."},
{"role": "user", "content": prompt}
]
)
# Access the content directly as the response will be in text format
mitigations = response.choices[0].message.content
return mitigations
# Function to get mitigations from the Groq model's response.
def get_mitigations_groq(groq_api_key, groq_model, prompt):
client = Groq(api_key=groq_api_key)
response = client.chat.completions.create(
model=groq_model,
messages=[
{"role": "system", "content": "You are a helpful assistant that provides threat mitigation strategies in Markdown format."},
{"role": "user", "content": prompt}
]
)
# Process the response using our utility function
reasoning, mitigations = process_groq_response(
response.choices[0].message.content,
groq_model,
expect_json=False
)
# If we got reasoning, display it in an expander in the UI
if reasoning:
with st.expander("View model's reasoning process", expanded=False):
st.write(reasoning)
return mitigations