#!/usr/bin/env python3 """ Файрволл на Python с использованием iptables и блокировкой по доменам """ import subprocess import os import sys import socket import threading import time from datetime import datetime import dns.resolver class IptablesFirewall: def __init__(self): self.chain_name = "PYTHON_FIREWALL" self.blocked_domains = set() self.domain_to_ip_cache = {} self.dns_cache_ttl = 300 # 5 минут TTL для кэша self.is_monitoring = False self.monitor_thread = None self.log_file = "iptables_firewall.log" # Настройка DNS резолвера self.dns_resolver = dns.resolver.Resolver() self.dns_resolver.timeout = 2 self.dns_resolver.lifetime = 2 def run_command(self, cmd): """Выполнение команды iptables""" try: result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) return True except subprocess.CalledProcessError as e: print(f"Error executing command: {cmd}") print(f"Error output: {e.stderr}") return False def log_event(self, message): """Логирование событий""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_message = f"[{timestamp}] {message}" print(log_message) with open(self.log_file, 'a') as f: f.write(log_message + "\n") def initialize(self): """Инициализация цепочки файрволла""" # Очистка старых правил если они есть self.cleanup() # Создание пользовательской цепочки self.run_command(f"iptables -N {self.chain_name}") # Переход к нашей цепочке из INPUT и OUTPUT self.run_command(f"iptables -I INPUT -j {self.chain_name}") self.run_command(f"iptables -I OUTPUT -j {self.chain_name}") self.run_command(f"iptables -I FORWARD -j {self.chain_name}") # Разрешить localhost трафик self.run_command(f"iptables -A {self.chain_name} -s 127.0.0.1 -d 127.0.0.1 -j ACCEPT") self.log_event("Firewall initialized") def add_rule(self, protocol=None, source_ip=None, dest_ip=None, dest_port=None, action="DROP"): """Добавление правила в iptables""" cmd = f"iptables -A {self.chain_name}" if protocol: cmd += f" -p {protocol}" if source_ip: cmd += f" -s {source_ip}" if dest_ip: cmd += f" -d {dest_ip}" if dest_port: cmd += f" --dport {dest_port}" cmd += f" -j {action}" success = self.run_command(cmd) if success: self.log_event(f"Rule added: {cmd}") return success def remove_rule(self, protocol=None, source_ip=None, dest_ip=None, dest_port=None, action="DROP"): """Удаление правила из iptables""" cmd = f"iptables -D {self.chain_name}" if protocol: cmd += f" -p {protocol}" if source_ip: cmd += f" -s {source_ip}" if dest_ip: cmd += f" -d {dest_ip}" if dest_port: cmd += f" --dport {dest_port}" cmd += f" -j {action}" success = self.run_command(cmd) if success: self.log_event(f"Rule removed: {cmd}") return success def block_ip(self, ip_address): """Блокировка IP-адреса""" # Блокировка входящего и исходящего трафика success1 = self.add_rule(source_ip=ip_address, action="DROP") success2 = self.add_rule(dest_ip=ip_address, action="DROP") if success1 and success2: self.log_event(f"IP blocked: {ip_address}") return True return False def unblock_ip(self, ip_address): """Разблокировка IP-адреса""" # Удаление правил блокировки success1 = self.remove_rule(source_ip=ip_address, action="DROP") success2 = self.remove_rule(dest_ip=ip_address, action="DROP") if success1 and success2: self.log_event(f"IP unblocked: {ip_address}") return True return False def allow_port(self, port, protocol="tcp"): """Разрешение порта""" return self.add_rule(protocol=protocol, dest_port=port, action="ACCEPT") def block_port(self, port, protocol="tcp"): """Блокировка порта""" return self.add_rule(protocol=protocol, dest_port=port, action="DROP") def resolve_domain(self, domain): """Разрешение домена в IP-адреса""" domain = domain.lower().strip() # Проверка кэша if domain in self.domain_to_ip_cache: cache_time, ips = self.domain_to_ip_cache[domain] if time.time() - cache_time < self.dns_cache_ttl: return ips try: answers = self.dns_resolver.resolve(domain, 'A') ips = {str(answer) for answer in answers} # Обновление кэша self.domain_to_ip_cache[domain] = (time.time(), ips) self.log_event(f"DNS resolved: {domain} -> {', '.join(ips)}") return ips except Exception as e: self.log_event(f"DNS resolution failed for {domain}: {str(e)}") return set() def block_domain(self, domain): """Блокировка домена""" domain = domain.lower().strip() if domain in self.blocked_domains: self.log_event(f"Domain already blocked: {domain}") return True # Разрешаем домен в IP-адреса ips = self.resolve_domain(domain) if not ips: self.log_event(f"Warning: No IP addresses found for domain {domain}") return False # Блокируем все IP-адреса домена success_count = 0 for ip in ips: if self.block_ip(ip): success_count += 1 if success_count > 0: self.blocked_domains.add(domain) self.log_event(f"Domain blocked: {domain} ({success_count} IPs)") return True return False def unblock_domain(self, domain): """Разблокировка домена""" domain = domain.lower().strip() if domain not in self.blocked_domains: self.log_event(f"Domain not blocked: {domain}") return True # Получаем IP-адреса из кэша или разрешаем заново ips = set() if domain in self.domain_to_ip_cache: cache_time, cached_ips = self.domain_to_ip_cache[domain] ips = cached_ips else: ips = self.resolve_domain(domain) # Разблокируем все IP-адреса домена success_count = 0 for ip in ips: if self.unblock_ip(ip): success_count += 1 # Удаляем домен из списка блокировок self.blocked_domains.discard(domain) self.log_event(f"Domain unblocked: {domain} ({success_count} IPs)") return True def start_domain_monitoring(self, interval=60): """Запуск мониторинга доменов для обновления IP-адресов""" def monitor_loop(): self.log_event("Domain monitoring started") while self.is_monitoring: try: self.update_domain_ips() time.sleep(interval) except Exception as e: self.log_event(f"Error in domain monitoring: {e}") time.sleep(interval) self.is_monitoring = True self.monitor_thread = threading.Thread(target=monitor_loop) self.monitor_thread.daemon = True self.monitor_thread.start() def stop_domain_monitoring(self): """Остановка мониторинга доменов""" self.is_monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=5) self.log_event("Domain monitoring stopped") def update_domain_ips(self): """Обновление IP-адресов для заблокированных доменов""" if not self.blocked_domains: return self.log_event("Updating domain IP addresses...") for domain in list(self.blocked_domains): old_ips = set() if domain in self.domain_to_ip_cache: cache_time, cached_ips = self.domain_to_ip_cache[domain] old_ips = cached_ips # Разрешаем домен заново new_ips = self.resolve_domain(domain) if not new_ips: continue # Находим новые IP-адреса для блокировки ips_to_block = new_ips - old_ips # Находим старые IP-адреса для разблокировки ips_to_unblock = old_ips - new_ips # Блокируем новые IP-адреса for ip in ips_to_block: self.block_ip(ip) # Разблокируем старые IP-адреса for ip in ips_to_unblock: self.unblock_ip(ip) if ips_to_block or ips_to_unblock: self.log_event(f"Domain {domain} updated: +{len(ips_to_block)} -{len(ips_to_unblock)} IPs") def show_status(self): """Показать статус файрволла""" print("\n=== Firewall Status ===") print(f"Blocked domains: {len(self.blocked_domains)}") print(f"Cached domains: {len(self.domain_to_ip_cache)}") # Показать правила iptables print("\nCurrent rules:") os.system(f"iptables -L {self.chain_name} -n") if self.blocked_domains: print("\nBlocked domains:") for domain in sorted(self.blocked_domains): if domain in self.domain_to_ip_cache: cache_time, ips = self.domain_to_ip_cache[domain] ips_str = ', '.join(ips) print(f" {domain} -> {ips_str}") else: print(f" {domain} -> [not resolved]") def cleanup(self): """Очистка правил""" # Удаляем ссылки на нашу цепочку self.run_command(f"iptables -D INPUT -j {self.chain_name} 2>/dev/null || true") self.run_command(f"iptables -D OUTPUT -j {self.chain_name} 2>/dev/null || true") self.run_command(f"iptables -D FORWARD -j {self.chain_name} 2>/dev/null || true") # Очищаем цепочку self.run_command(f"iptables -F {self.chain_name} 2>/dev/null || true") self.run_command(f"iptables -X {self.chain_name} 2>/dev/null || true") self.stop_domain_monitoring() self.log_event("Firewall cleaned up") def load_domains_from_file(filename): """Загрузка доменов из файла""" domains = [] try: with open(filename, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): domains.append(line) except FileNotFoundError: print(f"File {filename} not found") return domains def interactive_menu(): """Интерактивное меню управления файрволлом""" fw = IptablesFirewall() try: fw.initialize() while True: print("\n=== iptables Firewall with Domain Blocking ===") print("1. Block domain") print("2. Unblock domain") print("3. Block IP") print("4. Block port") print("5. Allow port") print("6. Show status") print("7. Start domain monitoring") print("8. Stop domain monitoring") print("9. Load domains from file") print("10. Exit") choice = input("Select option: ").strip() if choice == '1': domain = input("Enter domain to block: ").strip() if domain: fw.block_domain(domain) elif choice == '2': domain = input("Enter domain to unblock: ").strip() if domain: fw.unblock_domain(domain) elif choice == '3': ip = input("Enter IP to block: ").strip() if ip: fw.block_ip(ip) elif choice == '4': port = input("Enter port to block: ").strip() if port.isdigit(): fw.block_port(int(port)) elif choice == '5': port = input("Enter port to allow: ").strip() if port.isdigit(): fw.allow_port(int(port)) elif choice == '6': fw.show_status() elif choice == '7': fw.start_domain_monitoring() print("Domain monitoring started") elif choice == '8': fw.stop_domain_monitoring() print("Domain monitoring stopped") elif choice == '9': filename = input("Enter filename: ").strip() domains = load_domains_from_file(filename) for domain in domains: fw.block_domain(domain) print(f"Loaded {len(domains)} domains from {filename}") elif choice == '10': break else: print("Invalid option") finally: fw.cleanup() # Пример использования if __name__ == "__main__": if os.geteuid() != 0: print("Requires root privileges") sys.exit(1) # Проверка наличия dnspython try: import dns.resolver except ImportError: print("Error: dnspython library required. Install with: pip install dnspython") sys.exit(1) if len(sys.argv) > 1 and sys.argv[1] == "--interactive": interactive_menu() else: # Автоматическая настройка с примером блокировки доменов fw = IptablesFirewall() try: fw.initialize() # Базовые правила fw.block_port(23) # Блокировка Telnet fw.allow_port(22) # Разрешение SSH fw.block_port(80) # Разрешение HTTP fw.block_port(443) # Разрешение HTTPS # Блокировка доменов (пример) fw.block_domain("example.com") fw.block_domain("test.org") # Запуск мониторинга доменов fw.start_domain_monitoring() print("Firewall rules applied with domain blocking") print("Domain monitoring is running...") print("Press Ctrl+C to stop") # Бесконечный цикл для поддержания работы try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down...") finally: fw.cleanup() print("Firewall rules cleaned up")