#!/usr/bin/env python3 """ PDG Activation Drill Server Manages snapshot cloning and NFS exports for disaster recovery testing. Author: Saleh Bubshait Email: Saleh@Bubshait.me Version: 1.0.0-beta """ import os import sys import subprocess import time import re from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple class Colors: """ANSI color codes for terminal styling""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' class ActivationDrillServer: def __init__(self): self.clone_prefix = "PDGdrill-" self.nfs_exports_file = "/etc/exports" def display_banner(self): """Display the Activation Drill Server banner""" banner = f""" {Colors.CYAN}{Colors.BOLD} ██████╗ ██████╗ ██████╗ ██╔══██╗██╔══██╗██╔════╝ ██████╔╝██║ ██║██║ ███╗ ██╔═══╝ ██║ ██║██║ ██║ ██║ ██████╔╝╚██████╔╝ ╚═╝ ╚═════╝ ╚═════╝ {Colors.ENDC} {Colors.DIM} │ Drill Activation (Server) │{Colors.ENDC} """ print(banner) def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: """Execute shell command and return success status and output""" try: result = subprocess.run( cmd, shell=True, capture_output=capture_output, text=True, check=False ) return result.returncode == 0, result.stdout + result.stderr except Exception as e: return False, str(e) def get_zfs_datasets(self) -> List[str]: """Retrieve available ZFS datasets""" success, output = self.run_command("zfs list -H -o name -t filesystem") if not success: return [] return [line.strip() for line in output.split('\n') if line.strip()] def get_dataset_snapshots(self, dataset: str) -> List[Dict]: """Get snapshots for a dataset that match PDG naming convention""" dataset_basename = dataset.split('/')[-1] expected_prefix = f"{dataset.replace('/', '-')}-" success, output = self.run_command(f"zfs list -H -o name,creation -t snapshot | grep '^{dataset}@'") if not success: return [] snapshots = [] for line in output.split('\n'): if line.strip(): parts = line.split('\t') if len(parts) >= 2: snap_name = parts[0].strip() creation = parts[1].strip() # Extract snapshot suffix (after @) snap_suffix = snap_name.split('@')[1] # Only include snapshots that match PDG naming convention if snap_suffix.startswith(expected_prefix): snapshots.append({ 'name': snap_name, 'suffix': snap_suffix, 'creation': creation }) # Sort by creation time (newest first) snapshots.sort(key=lambda x: x['creation'], reverse=True) return snapshots def show_loading(self, message: str, duration: float = 2.0): """Display a loading animation""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration while time.time() < end_time: for char in chars: print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) time.sleep(0.1) if time.time() >= end_time: break print(f"\r{' ' * (len(message) + 2)}\r", end="") def select_dataset(self) -> Optional[str]: """Interactive dataset selection""" datasets = self.get_zfs_datasets() if not datasets: print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") return None # Filter out PDGdrill clones from the list datasets = [d for d in datasets if not d.split('/')[-1].startswith(self.clone_prefix)] if len(datasets) > 100: print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}") print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}") while True: dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip() if not dataset: return None success, _ = self.run_command(f"zfs list {dataset}") if success: return dataset else: print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}") retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower() if retry != 'y': return None else: print(f"\n{Colors.BOLD}Available Services (Datasets):{Colors.ENDC}") for i, dataset in enumerate(datasets, 1): print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") while True: try: choice = input(f"\n{Colors.BOLD}Select service (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): return datasets[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def select_snapshot_by_date(self, dataset: str) -> Optional[str]: """Hierarchical snapshot selection by year/month/day""" snapshots = self.get_dataset_snapshots(dataset) if not snapshots: print(f"{Colors.RED}✗ No PDG snapshots found for {dataset}{Colors.ENDC}") return None # Parse all snapshots and group hierarchically parsed_snapshots = [] for snap in snapshots: try: suffix = snap['suffix'] import re date_pattern = r'(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})' match = re.search(date_pattern, suffix) if match: year, month, day, hour, minute, second = match.groups() parsed_snapshots.append({ **snap, 'year': year, 'month': month, 'day': day, 'time': f"{hour}:{minute}:{second}", 'full_date': f"{year}-{month}-{day}" }) except Exception: continue if not parsed_snapshots: print(f"{Colors.RED}✗ No properly formatted PDG snapshots found{Colors.ENDC}") return None # Group by year, month, day hierarchically years = {} for snap in parsed_snapshots: year = snap['year'] month = snap['month'] day = snap['day'] if year not in years: years[year] = {} if month not in years[year]: years[year][month] = {} if day not in years[year][month]: years[year][month][day] = [] years[year][month][day].append(snap) # Hierarchical selection selected_year = self._select_year(years) if len(years) > 1 else list(years.keys())[0] if not selected_year: return None selected_month = self._select_month(years[selected_year], selected_year) if len(years[selected_year]) > 1 else list(years[selected_year].keys())[0] if not selected_month: return None selected_day = self._select_day(years[selected_year][selected_month], selected_year, selected_month) if len(years[selected_year][selected_month]) > 1 else list(years[selected_year][selected_month].keys())[0] if not selected_day: return None # Select specific snapshot from the chosen day day_snapshots = years[selected_year][selected_month][selected_day] return self._select_time(day_snapshots, selected_year, selected_month, selected_day) def _select_year(self, years: Dict) -> Optional[str]: """Select year from available years""" year_list = sorted(years.keys(), reverse=True) print(f"\n{Colors.BOLD}📅 Year Selection{Colors.ENDC}") for i, year in enumerate(year_list, 1): total_snapshots = sum(len(day_snaps) for month in years[year].values() for day_snaps in month.values()) print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {year} ({total_snapshots} snapshots)") while True: try: choice = input(f"\n{Colors.BOLD}Select year (1-{len(year_list)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(year_list): return year_list[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def _select_month(self, months: Dict, year: str) -> Optional[str]: """Select month from available months""" month_list = sorted(months.keys(), reverse=True) month_names = ["", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] print(f"\n{Colors.BOLD}📅 Month Selection for {year}{Colors.ENDC}") for i, month in enumerate(month_list, 1): total_snapshots = sum(len(day_snaps) for day_snaps in months[month].values()) month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {month_name} ({total_snapshots} snapshots)") while True: try: choice = input(f"\n{Colors.BOLD}Select month (1-{len(month_list)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(month_list): return month_list[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def _select_day(self, days: Dict, year: str, month: str) -> Optional[str]: """Select day from available days""" day_list = sorted(days.keys(), reverse=True) month_names = ["", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month print(f"\n{Colors.BOLD}📅 Day Selection for {month_name} {year}{Colors.ENDC}") for i, day in enumerate(day_list, 1): snapshot_count = len(days[day]) try: from datetime import datetime date_obj = datetime(int(year), int(month), int(day)) day_name = date_obj.strftime('%A') print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {day_name}, {month_name} {day} ({snapshot_count} snapshots)") except: print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} Day {day} ({snapshot_count} snapshots)") while True: try: choice = input(f"\n{Colors.BOLD}Select day (1-{len(day_list)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(day_list): return day_list[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def _select_time(self, snapshots: List[Dict], year: str, month: str, day: str) -> Optional[str]: """Select specific snapshot by time""" month_names = ["", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month # Sort by time (newest first) snapshots.sort(key=lambda x: x['time'], reverse=True) print(f"\n{Colors.BOLD}🕐 Time Selection for {month_name} {day}, {year}{Colors.ENDC}") for i, snap in enumerate(snapshots, 1): creation_parts = snap['creation'].split() if len(creation_parts) >= 4: creation_display = f"{creation_parts[1]} {creation_parts[2]} {creation_parts[3]}" else: creation_display = snap['creation'] print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {snap['time']} {Colors.DIM}(created: {creation_display}){Colors.ENDC}") while True: try: choice = input(f"\n{Colors.BOLD}Select snapshot (1-{len(snapshots)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(snapshots): selected_snap = snapshots[idx] # Confirm selection print(f"\n{Colors.YELLOW}Selected: {month_name} {day}, {year} at {selected_snap['time']}{Colors.ENDC}") print(f"{Colors.DIM}Snapshot: {selected_snap['name']}{Colors.ENDC}") confirm = input(f"{Colors.BOLD}Proceed with this snapshot? (y/N): {Colors.ENDC}") if confirm.lower() == 'y': return selected_snap['name'] else: return self.select_snapshot_by_date(selected_snap['name'].split('@')[0]) # Start over print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def clone_snapshot(self, snapshot: str, dataset: str) -> Tuple[bool, str]: """Clone snapshot to PDGdrill naming""" dataset_name = dataset.split('/')[-1] clone_name = f"{'/'.join(dataset.split('/')[:-1])}/{self.clone_prefix}{dataset_name}" if '/' in dataset else f"{self.clone_prefix}{dataset_name}" # Check if clone already exists and destroy it success, _ = self.run_command(f"zfs list {clone_name}") if success: print(f"{Colors.YELLOW}Existing drill clone found, removing...{Colors.ENDC}") destroy_success, destroy_msg = self.run_command(f"zfs destroy {clone_name}") if not destroy_success: return False, f"Failed to remove existing clone: {destroy_msg}" # Create new clone success, output = self.run_command(f"zfs clone {snapshot} {clone_name}") if success: return True, f"Clone created: {clone_name}" else: return False, f"Failed to create clone: {output}" def update_nfs_exports(self, clone_name: str) -> Tuple[bool, str]: """Add clone to NFS exports if not already present""" # Use full pool path for NFS export export_path = f"/{clone_name}" # Check if clone is already in exports if os.path.exists(self.nfs_exports_file): with open(self.nfs_exports_file, 'r') as f: content = f.read() if export_path in content: return True, "NFS export already exists" # Add to exports with full path export_line = f"{export_path} *(rw,sync,no_root_squash,no_subtree_check)\n" try: with open(self.nfs_exports_file, 'a') as f: f.write(export_line) # Reload NFS exports success, output = self.run_command("exportfs -ra") if success: return True, f"NFS export added: {export_path}" else: return False, f"Failed to reload NFS exports: {output}" except Exception as e: return False, f"Failed to update exports file: {str(e)}" def run_activation_drill(self): """Main activation drill workflow""" print(f"\n{Colors.BOLD}🔄 Starting Activation Drill{Colors.ENDC}") # Select service/dataset dataset = self.select_dataset() if not dataset: return # Select snapshot snapshot = self.select_snapshot_by_date(dataset) if not snapshot: return print(f"\n{Colors.YELLOW}Preparing activation drill...{Colors.ENDC}") # Clone snapshot self.show_loading("Creating drill clone") success, message = self.clone_snapshot(snapshot, dataset) if not success: print(f"{Colors.RED}✗ {message}{Colors.ENDC}") return clone_name = message.split(': ')[1] # Extract clone name from success message print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") # Update NFS exports self.show_loading("Updating NFS exports") success, message = self.update_nfs_exports(clone_name) if success: print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") else: print(f"{Colors.YELLOW}⚠ {message}{Colors.ENDC}") print(f"\n{Colors.GREEN}🎉 Activation drill ready!{Colors.ENDC}") print(f"{Colors.DIM}Clone: {clone_name}{Colors.ENDC}") print(f"{Colors.DIM}Clients can now connect and mount the drill dataset{Colors.ENDC}") def run(self): """Main application loop""" try: os.system('clear' if os.name == 'posix' else 'cls') self.display_banner() self.run_activation_drill() except KeyboardInterrupt: print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}") sys.exit(0) if __name__ == "__main__": # Check if running as root if os.geteuid() != 0: print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") print(f"{Colors.DIM}NFS export updates may require sudo{Colors.ENDC}") server = ActivationDrillServer() server.run()