From 1831be56580793aa16bfc482add187c60619e1d4 Mon Sep 17 00:00:00 2001 From: sBubshait Date: Tue, 5 Aug 2025 15:40:38 +0300 Subject: [PATCH] feat: base drill server script --- activationDrill/drillServer.py | 329 +++++++++++++++++++++++++++++++++ 1 file changed, 329 insertions(+) create mode 100644 activationDrill/drillServer.py diff --git a/activationDrill/drillServer.py b/activationDrill/drillServer.py new file mode 100644 index 0000000..ae6988e --- /dev/null +++ b/activationDrill/drillServer.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +""" +PDG Activation Drill Server +Manages snapshot cloning and NFS exports for disaster recovery testing. +""" + +import os +import sys +import subprocess +import time +import re +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Tuple + +class Colors: + """ANSI color codes for terminal styling""" + HEADER = '\033[95m' + BLUE = '\033[94m' + CYAN = '\033[96m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + DIM = '\033[2m' + +class ActivationDrillServer: + def __init__(self): + self.clone_prefix = "PDGdrill-" + self.nfs_exports_file = "/etc/exports" + + def display_banner(self): + """Display the Activation Drill Server banner""" + banner = f""" +{Colors.CYAN}{Colors.BOLD} + ██████╗ ██████╗ ██████╗ + ██╔══██╗██╔══██╗██╔════╝ + ██████╔╝██║ ██║██║ ███╗ + ██╔═══╝ ██║ ██║██║ ██║ + ██║ ██████╔╝╚██████╔╝ + ╚═╝ ╚═════╝ ╚═════╝ +{Colors.ENDC} +{Colors.DIM} │ Drill Activation (Server) │{Colors.ENDC} +""" + print(banner) + + def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: + """Execute shell command and return success status and output""" + try: + result = subprocess.run( + cmd, shell=True, capture_output=capture_output, + text=True, check=False + ) + return result.returncode == 0, result.stdout + result.stderr + except Exception as e: + return False, str(e) + + def get_zfs_datasets(self) -> List[str]: + """Retrieve available ZFS datasets""" + success, output = self.run_command("zfs list -H -o name -t filesystem") + if not success: + return [] + return [line.strip() for line in output.split('\n') if line.strip()] + + def get_dataset_snapshots(self, dataset: str) -> List[Dict]: + """Get all snapshots for a dataset with timestamps""" + success, output = self.run_command(f"zfs list -H -o name,creation -t snapshot | grep '^{dataset}@'") + if not success: + return [] + + snapshots = [] + for line in output.split('\n'): + if line.strip(): + parts = line.split('\t') + if len(parts) >= 2: + snap_name = parts[0].strip() + creation = parts[1].strip() + + # Extract timestamp from snapshot name (assuming default format) + snap_suffix = snap_name.split('@')[1] + snapshots.append({ + 'name': snap_name, + 'suffix': snap_suffix, + 'creation': creation + }) + + # Sort by creation time (newest first) + snapshots.sort(key=lambda x: x['creation'], reverse=True) + return snapshots + + def show_loading(self, message: str, duration: float = 2.0): + """Display a loading animation""" + chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + end_time = time.time() + duration + + while time.time() < end_time: + for char in chars: + print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) + time.sleep(0.1) + if time.time() >= end_time: + break + print(f"\r{' ' * (len(message) + 2)}\r", end="") + + def select_dataset(self) -> Optional[str]: + """Interactive dataset selection""" + datasets = self.get_zfs_datasets() + if not datasets: + print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") + return None + + # Filter out PDGdrill clones from the list + datasets = [d for d in datasets if not d.split('/')[-1].startswith(self.clone_prefix)] + + if len(datasets) > 100: + print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}") + print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}") + + while True: + dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip() + if not dataset: + return None + + success, _ = self.run_command(f"zfs list {dataset}") + if success: + return dataset + else: + print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}") + retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower() + if retry != 'y': + return None + else: + print(f"\n{Colors.BOLD}Available Services (Datasets):{Colors.ENDC}") + for i, dataset in enumerate(datasets, 1): + print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") + + while True: + try: + choice = input(f"\n{Colors.BOLD}Select service (1-{len(datasets)}): {Colors.ENDC}") + idx = int(choice) - 1 + if 0 <= idx < len(datasets): + return datasets[idx] + print(f"{Colors.RED}Invalid selection{Colors.ENDC}") + except (ValueError, KeyboardInterrupt): + return None + + def select_snapshot_by_date(self, dataset: str) -> Optional[str]: + """Interactive snapshot selection by date""" + snapshots = self.get_dataset_snapshots(dataset) + if not snapshots: + print(f"{Colors.RED}✗ No snapshots found for {dataset}{Colors.ENDC}") + return None + + # Group snapshots by date + date_groups = {} + for snap in snapshots: + # Extract date from snapshot suffix (assuming YYYY-MM-DD_HH-MM-SS format) + try: + date_part = snap['suffix'].split('_')[0] if '_' in snap['suffix'] else snap['suffix'] + if len(date_part) >= 10: # YYYY-MM-DD + date_key = date_part[:10] + if date_key not in date_groups: + date_groups[date_key] = [] + date_groups[date_key].append(snap) + except: + continue + + if not date_groups: + print(f"{Colors.RED}✗ No properly formatted snapshots found{Colors.ENDC}") + return None + + dates = sorted(date_groups.keys(), reverse=True) + earliest_date = min(dates) + latest_date = max(dates) + + print(f"\n{Colors.BOLD}📅 Snapshot Date Selection{Colors.ENDC}") + print(f"{Colors.DIM}({len(snapshots)}) Snapshots available from {earliest_date} to {latest_date}{Colors.ENDC}") + + # Show available dates + print(f"\n{Colors.BOLD}Available Dates:{Colors.ENDC}") + for i, date in enumerate(dates, 1): + count = len(date_groups[date]) + print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {date} ({count} snapshot{'s' if count > 1 else ''})") + + # Select date + while True: + try: + choice = input(f"\n{Colors.BOLD}Select date (1-{len(dates)}): {Colors.ENDC}") + idx = int(choice) - 1 + if 0 <= idx < len(dates): + selected_date = dates[idx] + break + print(f"{Colors.RED}Invalid selection{Colors.ENDC}") + except (ValueError, KeyboardInterrupt): + return None + + # Show snapshots for selected date + day_snapshots = date_groups[selected_date] + print(f"\n{Colors.BOLD}Snapshots for {selected_date}:{Colors.ENDC}") + + for i, snap in enumerate(day_snapshots, 1): + time_part = "Unknown" + if '_' in snap['suffix']: + time_part = snap['suffix'].split('_')[1].replace('-', ':') + print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {time_part} - {snap['creation']}") + + # Select specific snapshot + while True: + try: + choice = input(f"\n{Colors.BOLD}Select snapshot (1-{len(day_snapshots)}): {Colors.ENDC}") + idx = int(choice) - 1 + if 0 <= idx < len(day_snapshots): + selected_snap = day_snapshots[idx]['name'] + + # Confirm selection + print(f"\n{Colors.YELLOW}Selected snapshot: {selected_snap}{Colors.ENDC}") + confirm = input(f"{Colors.BOLD}Proceed with this snapshot? (y/N): {Colors.ENDC}") + if confirm.lower() == 'y': + return selected_snap + else: + return self.select_snapshot_by_date(dataset) # Start over + print(f"{Colors.RED}Invalid selection{Colors.ENDC}") + except (ValueError, KeyboardInterrupt): + return None + + def clone_snapshot(self, snapshot: str, dataset: str) -> Tuple[bool, str]: + """Clone snapshot to PDGdrill naming""" + dataset_name = dataset.split('/')[-1] + clone_name = f"{'/'.join(dataset.split('/')[:-1])}/{self.clone_prefix}{dataset_name}" if '/' in dataset else f"{self.clone_prefix}{dataset_name}" + + # Check if clone already exists and destroy it + success, _ = self.run_command(f"zfs list {clone_name}") + if success: + print(f"{Colors.YELLOW}Existing drill clone found, removing...{Colors.ENDC}") + destroy_success, destroy_msg = self.run_command(f"zfs destroy {clone_name}") + if not destroy_success: + return False, f"Failed to remove existing clone: {destroy_msg}" + + # Create new clone + success, output = self.run_command(f"zfs clone {snapshot} {clone_name}") + if success: + return True, f"Clone created: {clone_name}" + else: + return False, f"Failed to create clone: {output}" + + def update_nfs_exports(self, clone_name: str) -> Tuple[bool, str]: + """Add clone to NFS exports if not already present""" + # Check if clone is already in exports + if os.path.exists(self.nfs_exports_file): + with open(self.nfs_exports_file, 'r') as f: + content = f.read() + if clone_name in content: + return True, "NFS export already exists" + + # Add to exports + mount_point = f"/{clone_name.split('/')[-1]}" + export_line = f"{mount_point} *(rw,sync,no_root_squash,no_subtree_check)\n" + + try: + with open(self.nfs_exports_file, 'a') as f: + f.write(export_line) + + # Reload NFS exports + success, output = self.run_command("exportfs -ra") + if success: + return True, f"NFS export added: {mount_point}" + else: + return False, f"Failed to reload NFS exports: {output}" + except Exception as e: + return False, f"Failed to update exports file: {str(e)}" + + def run_activation_drill(self): + """Main activation drill workflow""" + print(f"\n{Colors.BOLD}🔄 Starting Activation Drill{Colors.ENDC}") + + # Select service/dataset + dataset = self.select_dataset() + if not dataset: + return + + # Select snapshot + snapshot = self.select_snapshot_by_date(dataset) + if not snapshot: + return + + print(f"\n{Colors.YELLOW}Preparing activation drill...{Colors.ENDC}") + + # Clone snapshot + self.show_loading("Creating drill clone") + success, message = self.clone_snapshot(snapshot, dataset) + + if not success: + print(f"{Colors.RED}✗ {message}{Colors.ENDC}") + return + + clone_name = message.split(': ')[1] # Extract clone name from success message + print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") + + # Update NFS exports + self.show_loading("Updating NFS exports") + success, message = self.update_nfs_exports(clone_name) + + if success: + print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") + else: + print(f"{Colors.YELLOW}⚠ {message}{Colors.ENDC}") + + print(f"\n{Colors.GREEN}🎉 Activation drill ready!{Colors.ENDC}") + print(f"{Colors.DIM}Clone: {clone_name}{Colors.ENDC}") + print(f"{Colors.DIM}Clients can now connect and mount the drill dataset{Colors.ENDC}") + + def run(self): + """Main application loop""" + try: + os.system('clear' if os.name == 'posix' else 'cls') + self.display_banner() + self.run_activation_drill() + + except KeyboardInterrupt: + print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}") + sys.exit(0) + +if __name__ == "__main__": + # Check if running as root + if os.geteuid() != 0: + print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") + print(f"{Colors.DIM}NFS export updates may require sudo{Colors.ENDC}") + + server = ActivationDrillServer() + server.run() \ No newline at end of file