#!/usr/bin/env python3 """ PDG Activation Drill Server Manages snapshot cloning and NFS exports for disaster recovery testing. """ import os import sys import subprocess import time import re from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple class Colors: """ANSI color codes for terminal styling""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' class ActivationDrillServer: def __init__(self): self.clone_prefix = "PDGdrill-" self.nfs_exports_file = "/etc/exports" def display_banner(self): """Display the Activation Drill Server banner""" banner = f""" {Colors.CYAN}{Colors.BOLD} ██████╗ ██████╗ ██████╗ ██╔══██╗██╔══██╗██╔════╝ ██████╔╝██║ ██║██║ ███╗ ██╔═══╝ ██║ ██║██║ ██║ ██║ ██████╔╝╚██████╔╝ ╚═╝ ╚═════╝ ╚═════╝ {Colors.ENDC} {Colors.DIM} │ Drill Activation (Server) │{Colors.ENDC} """ print(banner) def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: """Execute shell command and return success status and output""" try: result = subprocess.run( cmd, shell=True, capture_output=capture_output, text=True, check=False ) return result.returncode == 0, result.stdout + result.stderr except Exception as e: return False, str(e) def get_zfs_datasets(self) -> List[str]: """Retrieve available ZFS datasets""" success, output = self.run_command("zfs list -H -o name -t filesystem") if not success: return [] return [line.strip() for line in output.split('\n') if line.strip()] def get_dataset_snapshots(self, dataset: str) -> List[Dict]: """Get all snapshots for a dataset with timestamps""" success, output = self.run_command(f"zfs list -H -o name,creation -t snapshot | grep '^{dataset}@'") if not success: return [] snapshots = [] for line in output.split('\n'): if line.strip(): parts = line.split('\t') if len(parts) >= 2: snap_name = parts[0].strip() creation = parts[1].strip() # Extract timestamp from snapshot name (assuming default format) snap_suffix = snap_name.split('@')[1] snapshots.append({ 'name': snap_name, 'suffix': snap_suffix, 'creation': creation }) # Sort by creation time (newest first) snapshots.sort(key=lambda x: x['creation'], reverse=True) return snapshots def show_loading(self, message: str, duration: float = 2.0): """Display a loading animation""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration while time.time() < end_time: for char in chars: print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) time.sleep(0.1) if time.time() >= end_time: break print(f"\r{' ' * (len(message) + 2)}\r", end="") def select_dataset(self) -> Optional[str]: """Interactive dataset selection""" datasets = self.get_zfs_datasets() if not datasets: print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") return None # Filter out PDGdrill clones from the list datasets = [d for d in datasets if not d.split('/')[-1].startswith(self.clone_prefix)] if len(datasets) > 100: print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}") print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}") while True: dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip() if not dataset: return None success, _ = self.run_command(f"zfs list {dataset}") if success: return dataset else: print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}") retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower() if retry != 'y': return None else: print(f"\n{Colors.BOLD}Available Services (Datasets):{Colors.ENDC}") for i, dataset in enumerate(datasets, 1): print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") while True: try: choice = input(f"\n{Colors.BOLD}Select service (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): return datasets[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def select_snapshot_by_date(self, dataset: str) -> Optional[str]: """Interactive snapshot selection by date""" snapshots = self.get_dataset_snapshots(dataset) if not snapshots: print(f"{Colors.RED}✗ No snapshots found for {dataset}{Colors.ENDC}") return None # Group snapshots by date date_groups = {} for snap in snapshots: # Parse snapshot suffix: format is [dataset-name]-YYYY-MM-DD_HH-MM-SS # Example: pool1-hanaStandby_nfs-2025-08-05_15-36-10 try: suffix = snap['suffix'] # Find the last occurrence of a date pattern (YYYY-MM-DD) # This handles cases where dataset names contain dashes import re date_pattern = r'(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})' match = re.search(date_pattern, suffix) if match: date_part = match.group(1) # YYYY-MM-DD time_part = match.group(2) # HH-MM-SS if date_part not in date_groups: date_groups[date_part] = [] # Store both date and time info snap['parsed_date'] = date_part snap['parsed_time'] = time_part.replace('-', ':') date_groups[date_part].append(snap) except Exception as e: print(f"{Colors.DIM}Warning: Could not parse snapshot {snap['suffix']}: {e}{Colors.ENDC}") continue if not date_groups: print(f"{Colors.RED}✗ No properly formatted snapshots found{Colors.ENDC}") print(f"{Colors.DIM}Expected format: [dataset-name]-YYYY-MM-DD_HH-MM-SS{Colors.ENDC}") return None dates = sorted(date_groups.keys(), reverse=True) earliest_date = min(dates) latest_date = max(dates) print(f"\n{Colors.BOLD}📅 Snapshot Date Selection{Colors.ENDC}") print(f"{Colors.DIM}({len(snapshots)}) Snapshots available from {earliest_date} to {latest_date}{Colors.ENDC}") # Show available dates print(f"\n{Colors.BOLD}Available Dates:{Colors.ENDC}") for i, date in enumerate(dates, 1): count = len(date_groups[date]) # Format date nicely try: from datetime import datetime date_obj = datetime.strptime(date, '%Y-%m-%d') formatted_date = date_obj.strftime('%B %d, %Y (%A)') print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {formatted_date} ({count} snapshot{'s' if count > 1 else ''})") except: print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {date} ({count} snapshot{'s' if count > 1 else ''})") # Select date while True: try: choice = input(f"\n{Colors.BOLD}Select date (1-{len(dates)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(dates): selected_date = dates[idx] break print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None # Show snapshots for selected date day_snapshots = date_groups[selected_date] # Format the selected date nicely try: from datetime import datetime date_obj = datetime.strptime(selected_date, '%Y-%m-%d') formatted_date = date_obj.strftime('%B %d, %Y') except: formatted_date = selected_date print(f"\n{Colors.BOLD}Snapshots for {formatted_date}:{Colors.ENDC}") for i, snap in enumerate(day_snapshots, 1): time_display = snap.get('parsed_time', 'Unknown time') # Make creation time more readable creation_parts = snap['creation'].split() if len(creation_parts) >= 4: # Show just date and time, skip timezone and year if same creation_display = f"{creation_parts[1]} {creation_parts[2]} {creation_parts[3]}" else: creation_display = snap['creation'] print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {time_display} {Colors.DIM}(created: {creation_display}){Colors.ENDC}") # Select specific snapshot while True: try: choice = input(f"\n{Colors.BOLD}Select snapshot (1-{len(day_snapshots)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(day_snapshots): selected_snap = day_snapshots[idx]['name'] selected_time = day_snapshots[idx].get('parsed_time', 'Unknown') # Confirm selection print(f"\n{Colors.YELLOW}Selected: {formatted_date} at {selected_time}{Colors.ENDC}") print(f"{Colors.DIM}Snapshot: {selected_snap}{Colors.ENDC}") confirm = input(f"{Colors.BOLD}Proceed with this snapshot? (y/N): {Colors.ENDC}") if confirm.lower() == 'y': return selected_snap else: return self.select_snapshot_by_date(dataset) # Start over print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def clone_snapshot(self, snapshot: str, dataset: str) -> Tuple[bool, str]: """Clone snapshot to PDGdrill naming""" dataset_name = dataset.split('/')[-1] clone_name = f"{'/'.join(dataset.split('/')[:-1])}/{self.clone_prefix}{dataset_name}" if '/' in dataset else f"{self.clone_prefix}{dataset_name}" # Check if clone already exists and destroy it success, _ = self.run_command(f"zfs list {clone_name}") if success: print(f"{Colors.YELLOW}Existing drill clone found, removing...{Colors.ENDC}") destroy_success, destroy_msg = self.run_command(f"zfs destroy {clone_name}") if not destroy_success: return False, f"Failed to remove existing clone: {destroy_msg}" # Create new clone success, output = self.run_command(f"zfs clone {snapshot} {clone_name}") if success: return True, f"Clone created: {clone_name}" else: return False, f"Failed to create clone: {output}" def update_nfs_exports(self, clone_name: str) -> Tuple[bool, str]: """Add clone to NFS exports if not already present""" # Check if clone is already in exports if os.path.exists(self.nfs_exports_file): with open(self.nfs_exports_file, 'r') as f: content = f.read() if clone_name in content: return True, "NFS export already exists" # Add to exports mount_point = f"/{clone_name.split('/')[-1]}" export_line = f"{mount_point} *(rw,sync,no_root_squash,no_subtree_check)\n" try: with open(self.nfs_exports_file, 'a') as f: f.write(export_line) # Reload NFS exports success, output = self.run_command("exportfs -ra") if success: return True, f"NFS export added: {mount_point}" else: return False, f"Failed to reload NFS exports: {output}" except Exception as e: return False, f"Failed to update exports file: {str(e)}" def run_activation_drill(self): """Main activation drill workflow""" print(f"\n{Colors.BOLD}🔄 Starting Activation Drill{Colors.ENDC}") # Select service/dataset dataset = self.select_dataset() if not dataset: return # Select snapshot snapshot = self.select_snapshot_by_date(dataset) if not snapshot: return print(f"\n{Colors.YELLOW}Preparing activation drill...{Colors.ENDC}") # Clone snapshot self.show_loading("Creating drill clone") success, message = self.clone_snapshot(snapshot, dataset) if not success: print(f"{Colors.RED}✗ {message}{Colors.ENDC}") return clone_name = message.split(': ')[1] # Extract clone name from success message print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") # Update NFS exports self.show_loading("Updating NFS exports") success, message = self.update_nfs_exports(clone_name) if success: print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") else: print(f"{Colors.YELLOW}⚠ {message}{Colors.ENDC}") print(f"\n{Colors.GREEN}🎉 Activation drill ready!{Colors.ENDC}") print(f"{Colors.DIM}Clone: {clone_name}{Colors.ENDC}") print(f"{Colors.DIM}Clients can now connect and mount the drill dataset{Colors.ENDC}") def run(self): """Main application loop""" try: os.system('clear' if os.name == 'posix' else 'cls') self.display_banner() self.run_activation_drill() except KeyboardInterrupt: print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}") sys.exit(0) if __name__ == "__main__": # Check if running as root if os.geteuid() != 0: print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") print(f"{Colors.DIM}NFS export updates may require sudo{Colors.ENDC}") server = ActivationDrillServer() server.run()