#!/usr/bin/env python3 """ PDG Activation Drill Server Manages snapshot cloning and NFS exports for disaster recovery testing. """ import os import sys import subprocess import time import re from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple class Colors: """ANSI color codes for terminal styling""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' class ActivationDrillServer: def __init__(self): self.clone_prefix = "PDGdrill-" self.nfs_exports_file = "/etc/exports" def display_banner(self): """Display the Activation Drill Server banner""" banner = f""" {Colors.CYAN}{Colors.BOLD} ██████╗ ██████╗ ██████╗ ██╔══██╗██╔══██╗██╔════╝ ██████╔╝██║ ██║██║ ███╗ ██╔═══╝ ██║ ██║██║ ██║ ██║ ██████╔╝╚██████╔╝ ╚═╝ ╚═════╝ ╚═════╝ {Colors.ENDC} {Colors.DIM} │ Drill Activation (Server) │{Colors.ENDC} """ print(banner) def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: """Execute shell command and return success status and output""" try: result = subprocess.run( cmd, shell=True, capture_output=capture_output, text=True, check=False ) return result.returncode == 0, result.stdout + result.stderr except Exception as e: return False, str(e) def get_zfs_datasets(self) -> List[str]: """Retrieve available ZFS datasets""" success, output = self.run_command("zfs list -H -o name -t filesystem") if not success: return [] return [line.strip() for line in output.split('\n') if line.strip()] def get_dataset_snapshots(self, dataset: str) -> List[Dict]: """Get snapshots for a dataset that match PDG naming convention""" dataset_basename = dataset.split('/')[-1] expected_prefix = f"{dataset.replace('/', '-')}-" success, output = self.run_command(f"zfs list -H -o name,creation -t snapshot | grep '^{dataset}@'") if not success: return [] snapshots = [] for line in output.split('\n'): if line.strip(): parts = line.split('\t') if len(parts) >= 2: snap_name = parts[0].strip() creation = parts[1].strip() # Extract snapshot suffix (after @) snap_suffix = snap_name.split('@')[1] # Only include snapshots that match PDG naming convention if snap_suffix.startswith(expected_prefix): snapshots.append({ 'name': snap_name, 'suffix': snap_suffix, 'creation': creation }) # Sort by creation time (newest first) snapshots.sort(key=lambda x: x['creation'], reverse=True) return snapshots def show_loading(self, message: str, duration: float = 2.0): """Display a loading animation""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration while time.time() < end_time: for char in chars: print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) time.sleep(0.1) if time.time() >= end_time: break print(f"\r{' ' * (len(message) + 2)}\r", end="") def select_dataset(self) -> Optional[str]: """Interactive dataset selection""" datasets = self.get_zfs_datasets() if not datasets: print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") return None # Filter out PDGdrill clones from the list datasets = [d for d in datasets if not d.split('/')[-1].startswith(self.clone_prefix)] if len(datasets) > 100: print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}") print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}") while True: dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip() if not dataset: return None success, _ = self.run_command(f"zfs list {dataset}") if success: return dataset else: print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}") retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower() if retry != 'y': return None else: print(f"\n{Colors.BOLD}Available Services (Datasets):{Colors.ENDC}") for i, dataset in enumerate(datasets, 1): print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") while True: try: choice = input(f"\n{Colors.BOLD}Select service (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): return datasets[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def select_snapshot_by_date(self, dataset: str) -> Optional[str]: """Hierarchical snapshot selection by year/month/day""" snapshots = self.get_dataset_snapshots(dataset) if not snapshots: print(f"{Colors.RED}✗ No PDG snapshots found for {dataset}{Colors.ENDC}") return None # Parse all snapshots and group hierarchically parsed_snapshots = [] for snap in snapshots: try: suffix = snap['suffix'] import re date_pattern = r'(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})' match = re.search(date_pattern, suffix) if match: year, month, day, hour, minute, second = match.groups() parsed_snapshots.append({ **snap, 'year': year, 'month': month, 'day': day, 'time': f"{hour}:{minute}:{second}", 'full_date': f"{year}-{month}-{day}" }) except Exception: continue if not parsed_snapshots: print(f"{Colors.RED}✗ No properly formatted PDG snapshots found{Colors.ENDC}") return None # Group by year, month, day hierarchically years = {} for snap in parsed_snapshots: year = snap['year'] month = snap['month'] day = snap['day'] if year not in years: years[year] = {} if month not in years[year]: years[year][month] = {} if day not in years[year][month]: years[year][month][day] = [] years[year][month][day].append(snap) # Hierarchical selection selected_year = self._select_year(years) if len(years) > 1 else list(years.keys())[0] if not selected_year: return None selected_month = self._select_month(years[selected_year], selected_year) if len(years[selected_year]) > 1 else list(years[selected_year].keys())[0] if not selected_month: return None selected_day = self._select_day(years[selected_year][selected_month], selected_year, selected_month) if len(years[selected_year][selected_month]) > 1 else list(years[selected_year][selected_month].keys())[0] if not selected_day: return None # Select specific snapshot from the chosen day day_snapshots = years[selected_year][selected_month][selected_day] return self._select_time(day_snapshots, selected_year, selected_month, selected_day) def _select_year(self, years: Dict) -> Optional[str]: """Select year from available years""" year_list = sorted(years.keys(), reverse=True) print(f"\n{Colors.BOLD}📅 Year Selection{Colors.ENDC}") for i, year in enumerate(year_list, 1): total_snapshots = sum(len(day_snaps) for month in years[year].values() for day_snaps in month.values()) print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {year} ({total_snapshots} snapshots)") while True: try: choice = input(f"\n{Colors.BOLD}Select year (1-{len(year_list)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(year_list): return year_list[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def _select_month(self, months: Dict, year: str) -> Optional[str]: """Select month from available months""" month_list = sorted(months.keys(), reverse=True) month_names = ["", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] print(f"\n{Colors.BOLD}📅 Month Selection for {year}{Colors.ENDC}") for i, month in enumerate(month_list, 1): total_snapshots = sum(len(day_snaps) for day_snaps in months[month].values()) month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {month_name} ({total_snapshots} snapshots)") while True: try: choice = input(f"\n{Colors.BOLD}Select month (1-{len(month_list)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(month_list): return month_list[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def _select_day(self, days: Dict, year: str, month: str) -> Optional[str]: """Select day from available days""" day_list = sorted(days.keys(), reverse=True) month_names = ["", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month print(f"\n{Colors.BOLD}📅 Day Selection for {month_name} {year}{Colors.ENDC}") for i, day in enumerate(day_list, 1): snapshot_count = len(days[day]) try: from datetime import datetime date_obj = datetime(int(year), int(month), int(day)) day_name = date_obj.strftime('%A') print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {day_name}, {month_name} {day} ({snapshot_count} snapshots)") except: print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} Day {day} ({snapshot_count} snapshots)") while True: try: choice = input(f"\n{Colors.BOLD}Select day (1-{len(day_list)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(day_list): return day_list[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def _select_time(self, snapshots: List[Dict], year: str, month: str, day: str) -> Optional[str]: """Select specific snapshot by time""" month_names = ["", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month # Sort by time (newest first) snapshots.sort(key=lambda x: x['time'], reverse=True) print(f"\n{Colors.BOLD}🕐 Time Selection for {month_name} {day}, {year}{Colors.ENDC}") for i, snap in enumerate(snapshots, 1): creation_parts = snap['creation'].split() if len(creation_parts) >= 4: creation_display = f"{creation_parts[1]} {creation_parts[2]} {creation_parts[3]}" else: creation_display = snap['creation'] print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {snap['time']} {Colors.DIM}(created: {creation_display}){Colors.ENDC}") while True: try: choice = input(f"\n{Colors.BOLD}Select snapshot (1-{len(snapshots)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(snapshots): selected_snap = snapshots[idx] # Confirm selection print(f"\n{Colors.YELLOW}Selected: {month_name} {day}, {year} at {selected_snap['time']}{Colors.ENDC}") print(f"{Colors.DIM}Snapshot: {selected_snap['name']}{Colors.ENDC}") confirm = input(f"{Colors.BOLD}Proceed with this snapshot? (y/N): {Colors.ENDC}") if confirm.lower() == 'y': return selected_snap['name'] else: return self.select_snapshot_by_date(selected_snap['name'].split('@')[0]) # Start over print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def clone_snapshot(self, snapshot: str, dataset: str) -> Tuple[bool, str]: """Clone snapshot to PDGdrill naming""" dataset_name = dataset.split('/')[-1] clone_name = f"{'/'.join(dataset.split('/')[:-1])}/{self.clone_prefix}{dataset_name}" if '/' in dataset else f"{self.clone_prefix}{dataset_name}" # Check if clone already exists and destroy it success, _ = self.run_command(f"zfs list {clone_name}") if success: print(f"{Colors.YELLOW}Existing drill clone found, removing...{Colors.ENDC}") destroy_success, destroy_msg = self.run_command(f"zfs destroy {clone_name}") if not destroy_success: return False, f"Failed to remove existing clone: {destroy_msg}" # Create new clone success, output = self.run_command(f"zfs clone {snapshot} {clone_name}") if success: return True, f"Clone created: {clone_name}" else: return False, f"Failed to create clone: {output}" def update_nfs_exports(self, clone_name: str) -> Tuple[bool, str]: """Add clone to NFS exports if not already present""" # Use full pool path for NFS export export_path = f"/{clone_name}" # Check if clone is already in exports if os.path.exists(self.nfs_exports_file): with open(self.nfs_exports_file, 'r') as f: content = f.read() if export_path in content: return True, "NFS export already exists" # Add to exports with full path export_line = f"{export_path} *(rw,sync,no_root_squash,no_subtree_check)\n" try: with open(self.nfs_exports_file, 'a') as f: f.write(export_line) # Reload NFS exports success, output = self.run_command("exportfs -ra") if success: return True, f"NFS export added: {export_path}" else: return False, f"Failed to reload NFS exports: {output}" except Exception as e: return False, f"Failed to update exports file: {str(e)}" def run_activation_drill(self): """Main activation drill workflow""" print(f"\n{Colors.BOLD}🔄 Starting Activation Drill{Colors.ENDC}") # Select service/dataset dataset = self.select_dataset() if not dataset: return # Select snapshot snapshot = self.select_snapshot_by_date(dataset) if not snapshot: return print(f"\n{Colors.YELLOW}Preparing activation drill...{Colors.ENDC}") # Clone snapshot self.show_loading("Creating drill clone") success, message = self.clone_snapshot(snapshot, dataset) if not success: print(f"{Colors.RED}✗ {message}{Colors.ENDC}") return clone_name = message.split(': ')[1] # Extract clone name from success message print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") # Update NFS exports self.show_loading("Updating NFS exports") success, message = self.update_nfs_exports(clone_name) if success: print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") else: print(f"{Colors.YELLOW}⚠ {message}{Colors.ENDC}") print(f"\n{Colors.GREEN}🎉 Activation drill ready!{Colors.ENDC}") print(f"{Colors.DIM}Clone: {clone_name}{Colors.ENDC}") print(f"{Colors.DIM}Clients can now connect and mount the drill dataset{Colors.ENDC}") def run(self): """Main application loop""" try: os.system('clear' if os.name == 'posix' else 'cls') self.display_banner() self.run_activation_drill() except KeyboardInterrupt: print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}") sys.exit(0) if __name__ == "__main__": # Check if running as root if os.geteuid() != 0: print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") print(f"{Colors.DIM}NFS export updates may require sudo{Colors.ENDC}") server = ActivationDrillServer() server.run()