-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcoisc.py
executable file
·251 lines (184 loc) · 7.89 KB
/
coisc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/env python3
"""
Generate a list of DNS blocks for a given set of hosts.
"""
import argparse
import os.path
import re
import sys
import requests
# TODO don't assume these defaults, allow for specification
# urls that have hostsfile-style notation
HOST_FILE_URLS = "blocks-hosts_style.txt"
# urls that are just a list of domains
NONHOST_FILE_URLS = "blocks.txt"
# file containing a list of domains to block all of
DOMAIN_BLOCK_FILE = "blockdomains.txt"
# IPs in a hosts file to consider safe
SAFE_IPS = ["0.0.0.0", "127.0.0.1", "255.255.255.255", "::1",
"fe00::0", "ff00::0", "ff02::1", "ff02::2", "ff02::3"]
# filter some words out of domains - we can write these ourselves
FILTER_DOMAIN = ["localhost", "localhost.localdomain"]
REDIRECT_IP = "127.0.0.1"
USER_AGENT = "Coisc (https://github.com/nosmo/coisc)"
class UnsupportedFormat(Exception):
pass
def filter_url_list(url_name, url_list, ip_provided):
"""Filter a list of URLs to remove comments and other things we don't
want.
url_name: a label to identify the list
url_list: a list of URL strings
ip_provided: is the IP provided each line?
"""
filtered_list = []
ip_regexp = re.compile("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
for url in url_list:
url = url.strip()
if url.startswith("#") or not url:
continue
if ip_regexp.match(url):
sys.stderr.write(" - Found IP address {} when we expected a domain. Skipping\n".format(
url))
if "#" in url:
# fix up lines that "have comments # here"
url = url.partition("#")[0]
if ip_provided:
url_split = url.split()
if len(url_split) > 2:
continue
ip, hostname = [ i.strip() for i in url.split() ]
if ip not in SAFE_IPS:
sys.stderr.write("WARNING!! Unsafe IP found in file {}: {}\n".format(
url_name, url))
#TODO systemexit here is extreme
raise SystemExit
else:
hostname = url
filtered_list.append(hostname)
return filtered_list
def process_url_dict(url_dict, ip_provided):
"""Download and process a label:url dict of URLs containing blocks.
url_dict: a label:url dict of URLs
ip_provided: is the IP provided each line? (/etc/hosts style)
"""
full_list = []
for label, url in url_dict.items():
print("Processing %s" % label)
try:
downloaded_req = requests.get(url, headers={"User-Agent": USER_AGENT})
except requests.exceptions.RequestException as exc:
sys.stderr.write("{}: Failed to download URL {}\n".format(label, url))
downloaded_list = downloaded_req.text.strip().split("\n")
filtered_list = filter_url_list(label, downloaded_list,
ip_provided)
full_list += filtered_list
return full_list
# TODO make these their own class
def format_dnsmasq(redirect_ip, domain):
return "address=/{}/{}\n".format(domain, redirect_ip)
def extract_dnsmasq(record_string):
"""Domain and subdomain-wide block for a domain using dnsmasq
"""
return record_string.strip().split("/")[1]
def format_hosts(redirect_ip, domain):
"""Domain-wide block for a domain using hostsfile
"""
return "{}\t{}\n".format(redirect_ip, domain)
def extract_hosts(record_string):
"""Extract hostname from a hostsfile line
"""
return record_string.split()[1]
def format_bind(redirect_ip, domain):
return "{}.\tIN\tA\t{}\n".format(domain, redirect_ip)
def extract_bind(record_string):
return record_string.split()[0]
def domain_block_dnsmasq(redirect_ip, domain):
# TODO this is now no longer different to any other dnsmasq block
# line so having this functionality here is overkill and
# redundant.
return "address=/{}/{}\n".format(domain, redirect_ip)
def domain_block_hosts(redirect_ip, domain):
"""Domain-wide block for a domain using hostsfile
Not supported for hostsfile due to lack of support
"""
raise UnsupportedFormat("No top-level block support for hosts file")
def domain_block_bind(redirect_ip, domain):
"""Domain-wide block for a domain using bind
Not supported for bind yet.
"""
raise UnsupportedFormat("No top-level block support for bind")
EXTRACT_DICT = {
"dnsmasq": extract_dnsmasq,
"hosts": extract_hosts,
"bind": extract_bind
}
OUTPUT_DICT = {
"dnsmasq": format_dnsmasq,
"hosts": format_hosts,
"bind": format_bind
}
DOMAIN_OUTPUT_DICT = {
"dnsmasq": domain_block_dnsmasq,
"hosts": domain_block_hosts,
"bind": domain_block_bind
}
def urlfile_to_dict(file_path):
url_dict = {}
with open(file_path) as url_f:
for url_str in url_f.read().split("\n"):
url_str = url_str.strip()
if url_str:
url_label, url = url_str.split(" ")
url_dict[url_label] = url
return url_dict
def main(output_format, output_path, add_mode, domain_block_files):
full_domain_list = []
domain_blocks = []
for domain_file in domain_block_files:
with open(domain_file) as domain_f:
for l in domain_f.read().strip().split("\n"):
domain_blocks.append(l)
full_domain_list += process_url_dict(urlfile_to_dict(HOST_FILE_URLS), True)
full_domain_list += process_url_dict(urlfile_to_dict(NONHOST_FILE_URLS), False)
print("\nFinished processing")
print("Got %d entries" % len(full_domain_list))
print("Of which %d were duplicates" % (len(full_domain_list) - len(set(full_domain_list))))
existing_list = []
if os.path.exists(output_path):
with open(output_path, "r") as output_f:
extract_function = EXTRACT_DICT[output_format]
existing_list = [extract_function(i.strip())
for i in output_f.read().split("\n")
if i.strip()]
if add_mode:
full_domain_list = set(full_domain_list + existing_list)
if domain_blocks:
print("Got {} domain blocks".format(len(domain_blocks)))
with open(output_path, "w") as output_f:
for domain in full_domain_list:
output_f.write(OUTPUT_DICT[output_format](REDIRECT_IP, domain))
for domain_block in domain_blocks:
try:
output_f.write(DOMAIN_OUTPUT_DICT[output_format](REDIRECT_IP, domain_block))
except UnsupportedFormat as exc:
sys.stderr.write(
"Not writing domain block for unsupported format {}: {}\n".format(
output_format, exc))
break
print("Complete - wrote using %s format to %s" % (output_format, output_path))
if not add_mode and existing_list and existing_list != full_domain_list:
print("Added %d entries" % len(set(full_domain_list).difference(existing_list)))
print("Removed %d entries" % len(set(existing_list).difference(full_domain_list)))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Generate system-level advertising/malware blocklists')
parser.add_argument("--domainlist", "-D", dest="domain_blocks", action="store", default="",
help="File to read manual domain blocks from", required=False, nargs='*')
parser.add_argument("--output", "-o", dest="output_path", action="store", default="",
help="Where to write output", required=True)
parser.add_argument("--format", "-f", dest="output_format", action="store", default="hosts",
help="Format to write output using", choices=list(OUTPUT_DICT.keys()))
parser.add_argument("--add", "-a", dest="add", action="store_true", default=False,
help="Don't remove any lines, only add new ones")
args = parser.parse_args()
main(args.output_format, args.output_path, args.add, args.domain_blocks)