Attachment 'blocklists-ipset.py'

Download

   1 #! /bin/env python3
   2 
   3 #
   4 # This program is free software: you can redistribute it and/or modify
   5 # it under the terms of the GNU General Public License as published by
   6 # the Free Software Foundation, either version 3 of the License
   7 
   8 # This program is distributed in the hope that it will be useful,
   9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  11 # GNU General Public License for more details.
  12 
  13 # You should have received a copy of the GNU General Public License
  14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
  15 
  16 import argparse
  17 import http.client
  18 import ipaddress
  19 import socket
  20 import ssl
  21 import subprocess
  22 import sys
  23 import tempfile
  24 
  25 def eprint(*args, **kwargs):
  26     """
  27     Print to stderr
  28     """
  29     print(*args, file=sys.stderr, **kwargs)
  30 
  31 class Network:
  32     """
  33     Container class for caching a subnet's network address and mask
  34     """
  35     def __init__(self, subnet):
  36         network = ipaddress.ip_network(subnet)
  37         self.network = int(network.network_address)
  38         self.mask = int(network.netmask)
  39 
  40 class BlocklistsIpset:
  41     """
  42     Implements a downloader and updater for IPset based filtering
  43     using the "all.txt" blocklist from blocklist.de
  44     Uses ipset on cmdline and uses near atomic swapping of new and old sets.
  45     Supports IPv4 and IPv6.
  46     """
  47     def __init__(self):
  48         self.blocklists_fqdn = "lists.blocklist.de"
  49         self.blocklists_file = "/lists/all.txt"
  50 
  51         self.blocklist_list = list()
  52         self.ipset_v4 = list()
  53         self.ipset_v6 = list()
  54 
  55         self.temporary_name_template = "blocklists-de-temporary"
  56         self.permanent_name_template = "blocklists-de-permanent"
  57         self.verbose = False
  58 
  59     @classmethod
  60     def write_header(cls, temporary_file_handle, header):
  61         """
  62         Write the right file header for ipset files (used using ipsec restore).
  63         """
  64         temporary_file_handle.write(bytearray("{}\n".format(header), 'utf-8'))
  65         temporary_file_handle.flush()
  66 
  67     @classmethod
  68     def generate_file_header(cls, name, settype="hash:ip", comment=True, family="inet",
  69                              hashsize=1024, maxelem=65535):
  70         """
  71         Generate the correct headers for the IPset files.
  72         """
  73         format_string = None
  74         if comment:
  75             format_string = "create {} {} family {} hashsize {} maxelem {} comment"
  76         else:
  77             format_string = "create {} {} family {} hashsize {} maxelem {}"
  78         return format_string.format(name, settype, family, hashsize, maxelem)
  79 
  80     @classmethod
  81     def restore_file(cls, filename):
  82         """
  83         Load the given filename using ipset restore.
  84         """
  85         cmd = "ipset -exist -f {} restore".format(filename).split(" ")
  86 
  87         process = subprocess.Popen(cmd)
  88         process.wait()
  89 
  90         if process.returncode != 0:
  91             print("Restoring the file {} failed with code {}".format(filename, process.returncode))
  92             return False
  93         return False
  94 
  95     @classmethod
  96     def derive_names(cls, name):
  97         """
  98         Generate the names for the IPsets for the IPv4 and IPv6 protocol families
  99         """
 100         ipv4_suffix = "_v4"
 101         ipv6_suffix = "_v6"
 102         return "{}{}".format(name, ipv4_suffix), "{}{}".format(name, ipv6_suffix)
 103 
 104     @classmethod
 105     def destroy_set(cls, set_1):
 106         """
 107         Destroy the given set referenced by its name (set_1 argument's value).
 108         """
 109         cmd = "ipset destroy {}".format(set_1).split(" ")
 110 
 111         process = subprocess.Popen(cmd)
 112         process.wait()
 113         if process.returncode != 0:
 114             print("Deleting the ipset {} failed with code {}".format(set_1, process.returncode))
 115             return False
 116         return True
 117 
 118     @classmethod
 119     def swap_sets(cls, set_1, set_2):
 120         """
 121         Swap the two given sets in the kernel.
 122         """
 123         cmd = "ipset swap {} {}".format(set_1, set_2).split(" ")
 124 
 125         process = subprocess.Popen(cmd)
 126         process.wait()
 127         if process.returncode != 0:
 128             eprint("Swapping the ipsets {} and {} failed with code {}".format(
 129                 set_1, set_2, process.returncode))
 130             return False
 131         return True
 132 
 133     def get_list(self):
 134         """
 135         Download the blocklist using HTTPS and TLSv1.2 or higher.
 136         """
 137         ctx = ssl.create_default_context()
 138         ctx.options |= ssl.OP_NO_TLSv1
 139         ctx.options |= ssl.OP_NO_TLSv1_1
 140         ctx.options |= ssl.OP_NO_COMPRESSION
 141         # initiate a secure HTTPS connection to get the list
 142         connection = http.client.HTTPSConnection(
 143             self.blocklists_fqdn,
 144             context=ctx, timeout=50)
 145         try:
 146             connection.connect()
 147         except:
 148             eprint("Error while connecting.")
 149 
 150         try:
 151             connection.request("GET", self.blocklists_file)
 152         except socket.error as e:
 153             eprint("Socket error: {}".format(e))
 154             return False
 155         except socket.timeout as timeout:
 156             eprint("Socket error: Connection timed out.")
 157             return False
 158 
 159         response = connection.getresponse()
 160 
 161         if response.status != 200:
 162             eprint("Server responded with statuscode {}. Aborting".format(response.statuscode))
 163             return False
 164 
 165         body = response.read()
 166         if not body:
 167             eprint("Server didn't send us any data.")
 168             return False
 169 
 170         self.blocklist_list = body.decode().split("\n")
 171         return True
 172 
 173     def process_blocklist(self):
 174         """
 175         Filter the downloaded blacklist for bogons, private, link-local and multicast addresses
 176         """
 177         number_of_ips = 0
 178         # set up IPv4 and IPv6 temporary files
 179         invalid_v4 = list()
 180         for i in [
 181                 "0.0.0.0/8", "10.0.0.0/8", "100.64.0.0/10", "172.16.0.0/12",
 182                 "192.168.0.0/16", "169.254.0.0/16", "255.0.0.0/8",
 183                 "224.0.0.0/4"
 184             ]:
 185             invalid_v4.append(Network(i))
 186 
 187         invalid_v6 = list()
 188         for i in [
 189                 "ff00::/8",
 190                 "fe80::/10",
 191                 "fd00::/8"
 192             ]:
 193             invalid_v6.append(Network(i))
 194 
 195         temporary_file_v4 = tempfile.NamedTemporaryFile()
 196         temporary_file_v6 = tempfile.NamedTemporaryFile()
 197 
 198         add_template = "add {} {}\n"
 199 
 200         temporary_name_v4, temporary_name_v6 = self.derive_names(
 201             self.temporary_name_template)
 202 
 203         permanent_name_v4, permanent_name_v6 = self.derive_names(
 204             self.permanent_name_template)
 205 
 206         self.write_header(
 207             temporary_file_v4.file,
 208             self.generate_file_header(
 209                 temporary_name_v4))
 210 
 211         self.write_header(
 212             temporary_file_v6.file,
 213             self.generate_file_header(
 214                 temporary_name_v6,
 215                 family="inet6"))
 216 
 217         # write the formatted IPsec record into the corresponding file
 218         for i in self.blocklist_list:
 219 
 220             # check if it's IPv4
 221             if i.find(".") != -1:
 222                 # check if it's in a private subnet
 223                 for j in invalid_v4:
 224                     if (int(ipaddress.ip_address(i)) & j.mask) == j.network:
 225                         continue
 226                 temporary_file_v4.file.write(bytearray(
 227                     add_template.format(temporary_name_v4, i), 'utf-8'))
 228                 number_of_ips += 1
 229             # else it's IPv6
 230             else:
 231                 # check if it's in a private subnet
 232                 for j in invalid_v6:
 233                     if (int(ipaddress.ip_address(i)) & j.mask) == j.network:
 234                         continue
 235                 temporary_file_v6.file.write(bytearray(
 236                     add_template.format(temporary_name_v6, i), 'utf-8'))
 237                 number_of_ips += 1
 238 
 239         temporary_file_v4.file.flush()
 240         temporary_file_v6.file.flush()
 241         # IPv4
 242         # load the new records into the new set
 243         self.restore_file(temporary_file_v4.name)
 244 
 245         # swap the set
 246         self.swap_sets(permanent_name_v4, temporary_name_v4)
 247 
 248         # destroy the old set
 249         self.destroy_set(temporary_name_v4)
 250 
 251         # IPv6
 252         # load the new records into the new set
 253         self.restore_file(temporary_file_v6.name)
 254 
 255         # swap the set
 256         self.swap_sets(permanent_name_v6, temporary_name_v6)
 257 
 258         # destroy the old set
 259         self.destroy_set(temporary_name_v6)
 260 
 261         if self.verbose:
 262             print("Loaded {} IPs into the sets".format(number_of_ips))
 263 
 264     def run(self):
 265         """
 266         Run the argument parser, download and apply the blocklist.
 267         """
 268         parser = argparse.ArgumentParser(
 269             description="Updates ipsets with all.txt from blocklist.de.")
 270         parser.add_argument(
 271             '-v',
 272             '--verbose',
 273             action='store_true',
 274             help="Enables verbose mode",
 275             dest="verbose"
 276         )
 277 
 278         args = parser.parse_args()
 279 
 280         self.verbose = args.verbose
 281 
 282         if not self.get_list():
 283             sys.exit(1)
 284 
 285         self.process_blocklist()
 286 
 287 if __name__ == '__main__':
 288     BLOCKLISTS = BlocklistsIpset()
 289     BLOCKLISTS.run()

Attached Files

To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.
  • [get | view] (2022-09-14 14:22:21, 9.1 KB) [[attachment:blocklists-ipset.py]]
 All files | Selected Files: delete move to page copy to page

You are not allowed to attach a file to this page.