Attachment 'blocklists-ipset.py'
Download 1 #! /bin/env python3
2
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License
7
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
12
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15
16 import argparse
17 import http.client
18 import ipaddress
19 import socket
20 import ssl
21 import subprocess
22 import sys
23 import tempfile
24
25 def eprint(*args, **kwargs):
26 """
27 Print to stderr
28 """
29 print(*args, file=sys.stderr, **kwargs)
30
31 class Network:
32 """
33 Container class for caching a subnet's network address and mask
34 """
35 def __init__(self, subnet):
36 network = ipaddress.ip_network(subnet)
37 self.network = int(network.network_address)
38 self.mask = int(network.netmask)
39
40 class BlocklistsIpset:
41 """
42 Implements a downloader and updater for IPset based filtering
43 using the "all.txt" blocklist from blocklist.de
44 Uses ipset on cmdline and uses near atomic swapping of new and old sets.
45 Supports IPv4 and IPv6.
46 """
47 def __init__(self):
48 self.blocklists_fqdn = "lists.blocklist.de"
49 self.blocklists_file = "/lists/all.txt"
50
51 self.blocklist_list = list()
52 self.ipset_v4 = list()
53 self.ipset_v6 = list()
54
55 self.temporary_name_template = "blocklists-de-temporary"
56 self.permanent_name_template = "blocklists-de-permanent"
57 self.verbose = False
58
59 @classmethod
60 def write_header(cls, temporary_file_handle, header):
61 """
62 Write the right file header for ipset files (used using ipsec restore).
63 """
64 temporary_file_handle.write(bytearray("{}\n".format(header), 'utf-8'))
65 temporary_file_handle.flush()
66
67 @classmethod
68 def generate_file_header(cls, name, settype="hash:ip", comment=True, family="inet",
69 hashsize=1024, maxelem=65535):
70 """
71 Generate the correct headers for the IPset files.
72 """
73 format_string = None
74 if comment:
75 format_string = "create {} {} family {} hashsize {} maxelem {} comment"
76 else:
77 format_string = "create {} {} family {} hashsize {} maxelem {}"
78 return format_string.format(name, settype, family, hashsize, maxelem)
79
80 @classmethod
81 def restore_file(cls, filename):
82 """
83 Load the given filename using ipset restore.
84 """
85 cmd = "ipset -exist -f {} restore".format(filename).split(" ")
86
87 process = subprocess.Popen(cmd)
88 process.wait()
89
90 if process.returncode != 0:
91 print("Restoring the file {} failed with code {}".format(filename, process.returncode))
92 return False
93 return False
94
95 @classmethod
96 def derive_names(cls, name):
97 """
98 Generate the names for the IPsets for the IPv4 and IPv6 protocol families
99 """
100 ipv4_suffix = "_v4"
101 ipv6_suffix = "_v6"
102 return "{}{}".format(name, ipv4_suffix), "{}{}".format(name, ipv6_suffix)
103
104 @classmethod
105 def destroy_set(cls, set_1):
106 """
107 Destroy the given set referenced by its name (set_1 argument's value).
108 """
109 cmd = "ipset destroy {}".format(set_1).split(" ")
110
111 process = subprocess.Popen(cmd)
112 process.wait()
113 if process.returncode != 0:
114 print("Deleting the ipset {} failed with code {}".format(set_1, process.returncode))
115 return False
116 return True
117
118 @classmethod
119 def swap_sets(cls, set_1, set_2):
120 """
121 Swap the two given sets in the kernel.
122 """
123 cmd = "ipset swap {} {}".format(set_1, set_2).split(" ")
124
125 process = subprocess.Popen(cmd)
126 process.wait()
127 if process.returncode != 0:
128 eprint("Swapping the ipsets {} and {} failed with code {}".format(
129 set_1, set_2, process.returncode))
130 return False
131 return True
132
133 def get_list(self):
134 """
135 Download the blocklist using HTTPS and TLSv1.2 or higher.
136 """
137 ctx = ssl.create_default_context()
138 ctx.options |= ssl.OP_NO_TLSv1
139 ctx.options |= ssl.OP_NO_TLSv1_1
140 ctx.options |= ssl.OP_NO_COMPRESSION
141 # initiate a secure HTTPS connection to get the list
142 connection = http.client.HTTPSConnection(
143 self.blocklists_fqdn,
144 context=ctx, timeout=50)
145 try:
146 connection.connect()
147 except:
148 eprint("Error while connecting.")
149
150 try:
151 connection.request("GET", self.blocklists_file)
152 except socket.error as e:
153 eprint("Socket error: {}".format(e))
154 return False
155 except socket.timeout as timeout:
156 eprint("Socket error: Connection timed out.")
157 return False
158
159 response = connection.getresponse()
160
161 if response.status != 200:
162 eprint("Server responded with statuscode {}. Aborting".format(response.statuscode))
163 return False
164
165 body = response.read()
166 if not body:
167 eprint("Server didn't send us any data.")
168 return False
169
170 self.blocklist_list = body.decode().split("\n")
171 return True
172
173 def process_blocklist(self):
174 """
175 Filter the downloaded blacklist for bogons, private, link-local and multicast addresses
176 """
177 number_of_ips = 0
178 # set up IPv4 and IPv6 temporary files
179 invalid_v4 = list()
180 for i in [
181 "0.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "172.16.0.0/12",
182 "192.168.0.0/16", "169.254.0.0/16", "255.0.0.0/8",
183 "224.0.0.0/4"
184 ]:
185 invalid_v4.append(Network(i))
186
187 invalid_v6 = list()
188 for i in [
189 "ff00::/8",
190 "fe80::/10",
191 "fd00::/8"
192 ]:
193 invalid_v6.append(Network(i))
194
195 temporary_file_v4 = tempfile.NamedTemporaryFile()
196 temporary_file_v6 = tempfile.NamedTemporaryFile()
197
198 add_template = "add {} {}\n"
199
200 temporary_name_v4, temporary_name_v6 = self.derive_names(
201 self.temporary_name_template)
202
203 permanent_name_v4, permanent_name_v6 = self.derive_names(
204 self.permanent_name_template)
205
206 self.write_header(
207 temporary_file_v4.file,
208 self.generate_file_header(
209 temporary_name_v4))
210
211 self.write_header(
212 temporary_file_v6.file,
213 self.generate_file_header(
214 temporary_name_v6,
215 family="inet6"))
216
217 # write the formatted IPsec record into the corresponding file
218 for i in self.blocklist_list:
219
220 # check if it's IPv4
221 if i.find(".") != -1:
222 # check if it's in a private subnet
223 for j in invalid_v4:
224 if (int(ipaddress.ip_address(i)) & j.mask) == j.network:
225 continue
226 temporary_file_v4.file.write(bytearray(
227 add_template.format(temporary_name_v4, i), 'utf-8'))
228 number_of_ips += 1
229 # else it's IPv6
230 else:
231 # check if it's in a private subnet
232 for j in invalid_v6:
233 if (int(ipaddress.ip_address(i)) & j.mask) == j.network:
234 continue
235 temporary_file_v6.file.write(bytearray(
236 add_template.format(temporary_name_v6, i), 'utf-8'))
237 number_of_ips += 1
238
239 temporary_file_v4.file.flush()
240 temporary_file_v6.file.flush()
241 # IPv4
242 # load the new records into the new set
243 self.restore_file(temporary_file_v4.name)
244
245 # swap the set
246 self.swap_sets(permanent_name_v4, temporary_name_v4)
247
248 # destroy the old set
249 self.destroy_set(temporary_name_v4)
250
251 # IPv6
252 # load the new records into the new set
253 self.restore_file(temporary_file_v6.name)
254
255 # swap the set
256 self.swap_sets(permanent_name_v6, temporary_name_v6)
257
258 # destroy the old set
259 self.destroy_set(temporary_name_v6)
260
261 if self.verbose:
262 print("Loaded {} IPs into the sets".format(number_of_ips))
263
264 def run(self):
265 """
266 Run the argument parser, download and apply the blocklist.
267 """
268 parser = argparse.ArgumentParser(
269 description="Updates ipsets with all.txt from blocklist.de.")
270 parser.add_argument(
271 '-v',
272 '--verbose',
273 action='store_true',
274 help="Enables verbose mode",
275 dest="verbose"
276 )
277
278 args = parser.parse_args()
279
280 self.verbose = args.verbose
281
282 if not self.get_list():
283 sys.exit(1)
284
285 self.process_blocklist()
286
287 if __name__ == '__main__':
288 BLOCKLISTS = BlocklistsIpset()
289 BLOCKLISTS.run()
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.