13
13
from flask import Flask , request , Response , json , redirect
14
14
from waitress import serve
15
15
import maxminddb
16
+ from oxl_utils .valid .net import valid_ip4 , valid_public_ip , valid_asn
16
17
17
18
app = Flask ('risk-db' )
18
19
BASE_DIR = Path ('/var/local/lib/risk-db' )
33
34
report_lock = Lock ()
34
35
35
36
36
- def _valid_ipv4 (ip : str ) -> bool :
37
- try :
38
- IPv4Address (ip )
39
- return True
40
-
41
- except AddressValueError :
42
- return False
43
-
44
-
45
- def _valid_public_ip (ip : str ) -> bool :
46
- ip = str (ip )
47
- try :
48
- ip = IPv4Address (ip )
49
- return ip .is_global and \
50
- not ip .is_loopback and \
51
- not ip .is_reserved and \
52
- not ip .is_multicast and \
53
- not ip .is_link_local
54
-
55
- except AddressValueError :
56
- try :
57
- ip = IPv6Address (ip )
58
- return ip .is_global and \
59
- not ip .is_loopback and \
60
- not ip .is_reserved and \
61
- not ip .is_multicast and \
62
- not ip .is_link_local
63
-
64
- except AddressValueError :
65
- return False
66
-
67
-
68
- def _valid_asn (_asn : str ) -> bool :
69
- return _asn .isdigit () and 0 <= int (_asn ) <= 4_294_967_294
70
-
71
-
72
37
def _safe_comment (cmt : str ) -> str :
73
38
return regex_replace (r'[^\sa-zA-Z0-9_=+.-]' , '' , cmt )[:50 ]
74
39
@@ -82,14 +47,14 @@ def _response_json(code: int, data: dict) -> Response:
82
47
83
48
84
49
def _get_ipv (ip : str ) -> int :
85
- if _valid_ipv4 (ip ):
50
+ if valid_ip4 (ip ):
86
51
return 4
87
52
88
53
return 6
89
54
90
55
91
56
def _get_src_ip () -> str :
92
- if _valid_public_ip (request .remote_addr ):
57
+ if valid_public_ip (request .remote_addr ):
93
58
return request .remote_addr
94
59
95
60
if 'X-Real-IP' in request .headers :
@@ -112,7 +77,7 @@ def report() -> Response:
112
77
if 'ip' in data and data ['ip' ].startswith ('::ffff:' ):
113
78
data ['ip' ] = data ['ip' ].replace ('::ffff:' , '' )
114
79
115
- if 'ip' not in data or not _valid_public_ip (data ['ip' ]):
80
+ if 'ip' not in data or not valid_public_ip (data ['ip' ]):
116
81
return _response_json (code = 400 , data = {'msg' : 'Invalid IP provided' })
117
82
118
83
if 'cat' not in data or data ['cat' ].lower () not in RISK_CATEGORIES :
@@ -123,7 +88,7 @@ def report() -> Response:
123
88
124
89
r = {
125
90
'ip' : data ['ip' ], 'cat' : data ['cat' ].lower (), 'time' : int (time ()),
126
- 'v' : 4 if _valid_ipv4 (data ['ip' ]) else 6 , 'cmt' : None , 'token' : None , 'by' : _get_src_ip ,
91
+ 'v' : 4 if valid_ip4 (data ['ip' ]) else 6 , 'cmt' : None , 'token' : None , 'by' : _get_src_ip ,
127
92
}
128
93
129
94
if 'cmt' in data :
@@ -145,7 +110,7 @@ def check(ip) -> Response:
145
110
if ip .startswith ('::ffff:' ):
146
111
ip = ip .replace ('::ffff:' , '' )
147
112
148
- if not _valid_public_ip (ip ):
113
+ if not valid_public_ip (ip ):
149
114
return _response_json (code = 400 , data = {'msg' : 'Invalid IP provided' })
150
115
151
116
try :
@@ -168,7 +133,7 @@ def check_net(ip) -> Response:
168
133
if ip .find ('/' ) != - 1 :
169
134
ip = ip .split ('/' , 1 )[0 ]
170
135
171
- if not _valid_public_ip (ip ):
136
+ if not valid_public_ip (ip ):
172
137
return _response_json (code = 400 , data = {'msg' : 'Invalid IP provided' })
173
138
174
139
ipv = _get_ipv (ip )
@@ -191,7 +156,7 @@ def check_net(ip) -> Response:
191
156
192
157
@app .route ('/api/asn/<nr>' , methods = ['GET' ])
193
158
def check_asn (nr ) -> Response :
194
- if not _valid_asn (nr ):
159
+ if not valid_asn (nr ):
195
160
return _response_json (code = 400 , data = {'msg' : 'Invalid ASN provided' })
196
161
197
162
try :
0 commit comments