Skip to content

Commit 818e6bc

Browse files
test: Add threat intelligence integration tests
Add comprehensive integration tests for threat intelligence: - Test VirusTotal API integration (IP, hash queries) - Test Abuse.ch API integration (URL, hash queries) - Test OTX API integration (IP queries) - Test multi-source aggregation - Test cache functionality - End-to-end alert enrichment flow tests Tests can be run with API keys configured: VIRUSTOTAL_API_KEY=your_key pytest tests/integration/test_threat_intel.py
1 parent 7e041c0 commit 818e6bc

1 file changed

Lines changed: 178 additions & 0 deletions

File tree

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# Copyright 2026 CCR <chenchunrun@gmail.com>
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Integration tests for Threat Intelligence Aggregator Service.
17+
18+
Tests real API integrations with VirusTotal, Abuse.ch, and OTX.
19+
"""
20+
21+
import asyncio
22+
import os
23+
import pytest
24+
25+
from services.threat_intel_aggregator.sources.virustotal import VirusTotalSource
26+
from services.threat_intel_aggregator.sources.abuse_ch import AbuseCHSource
27+
from services.threat_intel_aggregator.sources.otx import OTXSource
28+
29+
30+
class TestVirusTotalIntegration:
31+
"""Test VirusTotal API integration."""
32+
33+
@pytest.fixture
34+
def vt_source(self):
35+
"""Create VirusTotal source instance."""
36+
api_key = os.getenv("VIRUSTOTAL_API_KEY", "")
37+
return VirusTotalSource(api_key)
38+
39+
@pytest.mark.asyncio
40+
async def test_query_ip(self, vt_source):
41+
"""Test querying IP reputation."""
42+
if not vt_source.enabled:
43+
pytest.skip("VirusTotal API key not configured")
44+
45+
# Google DNS - should be safe
46+
result = await vt_source.query_ioc("8.8.8.8", "ip")
47+
48+
assert result is not None
49+
assert result["source"] == "virustotal"
50+
assert "detected" in result
51+
assert "country" in result or "as_owner" in result
52+
53+
@pytest.mark.asyncio
54+
async def test_query_hash(self, vt_source):
55+
"""Test querying file hash reputation."""
56+
if not vt_source.enabled:
57+
pytest.skip("VirusTotal API key not configured")
58+
59+
# EICAR test file signature (safe test file)
60+
eicar_hash = "44d88612fea8a8f36de82e1278abb02f"
61+
result = await vt_source.query_ioc(eicar_hash, "hash")
62+
63+
assert result is not None
64+
assert result["source"] == "virustotal"
65+
assert "detected" in result
66+
assert "positives" in result
67+
68+
@pytest.mark.asyncio
69+
async def test_cache_functionality(self, vt_source):
70+
"""Test that caching works correctly."""
71+
if not vt_source.enabled:
72+
pytest.skip("VirusTotal API key not configured")
73+
74+
# First query
75+
result1 = await vt_source.query_ioc("8.8.8.8", "ip")
76+
# Second query (should be cached)
77+
result2 = await vt_source.query_ioc("8.8.8.8", "ip")
78+
79+
assert result1 == result2
80+
81+
82+
class TestAbuseCHIntegration:
83+
"""Test Abuse.ch API integration."""
84+
85+
@pytest.fixture
86+
def abuse_source(self):
87+
"""Create Abuse.ch source instance."""
88+
return AbuseCHSource()
89+
90+
@pytest.mark.asyncio
91+
async def test_query_url(self, abuse_source):
92+
"""Test querying URL reputation."""
93+
# Test with a known safe URL
94+
result = await abuse_source.query_ioc("https://www.google.com", "url")
95+
96+
assert result is not None
97+
assert result["source"] == "abuse_ch"
98+
assert "detected" in result
99+
100+
@pytest.mark.asyncio
101+
async def test_query_hash(self, abuse_source):
102+
"""Test querying file hash."""
103+
# EICAR test file
104+
eicar_hash = "44d88612fea8a8f36de82e1278abb02f"
105+
result = await abuse_source.query_ioc(eicar_hash, "hash")
106+
107+
assert result is not None
108+
assert "detected" in result
109+
110+
111+
class TestOTXIntegration:
112+
"""Test AlienVault OTX API integration."""
113+
114+
@pytest.fixture
115+
def otx_source(self):
116+
"""Create OTX source instance."""
117+
api_key = os.getenv("OTX_API_KEY", "")
118+
return OTXSource(api_key)
119+
120+
@pytest.mark.asyncio
121+
async def test_query_ip(self, otx_source):
122+
"""Test querying IP reputation."""
123+
if not otx_source.enabled:
124+
pytest.skip("OTX API key not configured")
125+
126+
result = await otx_source.query_ioc("8.8.8.8", "ip")
127+
128+
assert result is not None
129+
assert result["source"] == "otx"
130+
assert "detected" in result
131+
132+
133+
class TestAggregation:
134+
"""Test threat intelligence aggregation."""
135+
136+
@pytest.mark.asyncio
137+
async def test_multiple_sources(self):
138+
"""Test querying multiple sources in parallel."""
139+
from services.threat_intel_aggregator.sources.aggregator import ThreatIntelAggregator
140+
141+
sources = [
142+
AbuseCHSource(), # Always available
143+
]
144+
145+
# Add optional sources if API keys are configured
146+
vt_key = os.getenv("VIRUSTOTAL_API_KEY")
147+
if vt_key and vt_key != "your_vt_key":
148+
sources.append(VirusTotalSource(vt_key))
149+
150+
otx_key = os.getenv("OTX_API_KEY")
151+
if otx_key and otx_key != "your_otx_key":
152+
sources.append(OTXSource(otx_key))
153+
154+
aggregator = ThreatIntelAggregator(sources)
155+
result = await aggregator.query_multiple_sources("8.8.8.8", "ip")
156+
157+
assert result is not None
158+
assert "ioc" in result
159+
assert "aggregate_score" in result
160+
assert "threat_level" in result
161+
assert "detected_by_count" in result
162+
assert "total_sources" in result
163+
assert result["total_sources"] >= 1
164+
165+
166+
@pytest.mark.integration
167+
class TestEndToEndThreatIntel:
168+
"""End-to-end tests for threat intelligence workflow."""
169+
170+
@pytest.mark.asyncio
171+
async def test_alert_enrichment_flow(self):
172+
"""Test complete alert enrichment with threat intel."""
173+
# This test requires running services
174+
pytest.skip("Requires running services - add to workflow tests")
175+
176+
177+
if __name__ == "__main__":
178+
pytest.main([__file__, "-v", "-s"])

0 commit comments

Comments
 (0)