#!/usr/bin/env python3 """ PDG Activation Drill Client Connects to server and activates services from snapshot clones. """ import os import sys import subprocess import time from typing import List, Dict, Optional, Tuple class Colors: """ANSI color codes for terminal styling""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' class ActivationDrillClient: def __init__(self): self.clone_prefix = "PDGdrill-" self.service_types = { "1": ("SAP HANA", "hana"), "2": ("SAP Application", "sap_app"), "3": ("Custom", "custom") } def display_banner(self): """Display the Activation Drill Client banner""" banner = f""" {Colors.CYAN}{Colors.BOLD} ██████╗ ██████╗ ██████╗ ██╔══██╗██╔══██╗██╔════╝ ██████╔╝██║ ██║██║ ███╗ ██╔═══╝ ██║ ██║██║ ██║ ██║ ██████╔╝╚██████╔╝ ╚═╝ ╚═════╝ ╚═════╝ {Colors.ENDC} {Colors.DIM} │ Drill Activation (Client) │{Colors.ENDC} """ print(banner) def run_command(self, cmd: str, capture_output: bool = True, as_user: str = None, show_output: bool = False) -> Tuple[bool, str]: """Execute shell command and return success status and output""" try: if as_user: cmd = f"su - {as_user} -c '{cmd}'" if capture_output and not show_output: result = subprocess.run( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=False ) return result.returncode == 0, result.stdout + result.stderr elif show_output: # Real-time output display print(f"{Colors.DIM} Command output:{Colors.ENDC}") process = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) output_lines = [] while True: line = process.stdout.readline() if not line and process.poll() is not None: break if line: line = line.rstrip() print(f"{Colors.DIM} {line}{Colors.ENDC}") output_lines.append(line) return_code = process.poll() print() # Add blank line after command output return return_code == 0, '\n'.join(output_lines) else: result = subprocess.run(cmd, shell=True, check=False) return result.returncode == 0, "" except Exception as e: return False, str(e) def show_loading(self, message: str, duration: float = 3.0): """Display a loading animation""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration while time.time() < end_time: for char in chars: print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) time.sleep(0.1) if time.time() >= end_time: break print(f"\r{' ' * (len(message) + 2)}\r", end="") def get_dataset_input(self) -> Optional[str]: """Get dataset name from user input""" print(f"\n{Colors.BOLD}📊 Service Dataset Selection{Colors.ENDC}") print(f"{Colors.DIM}Enter the full dataset path including pool{Colors.ENDC}") print(f"{Colors.DIM}Example: pool1/productionHanaDB{Colors.ENDC}") while True: dataset = input(f"\n{Colors.BOLD}Dataset path: {Colors.ENDC}").strip() if not dataset: return None if '/' not in dataset: print(f"{Colors.RED}✗ Please include pool name (e.g., pool1/datasetname){Colors.ENDC}") continue return dataset def select_service_type(self) -> Optional[str]: """Select activation service type""" print(f"\n{Colors.BOLD}🔧 Activation Type Selection{Colors.ENDC}") for key, (name, _) in self.service_types.items(): if key == "1": print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name} {Colors.GREEN}[Available]{Colors.ENDC}") else: print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name} {Colors.RED}[Not Implemented]{Colors.ENDC}") while True: choice = input(f"\n{Colors.BOLD}Select activation type (1-3): {Colors.ENDC}") if choice in self.service_types: name, service_type = self.service_types[choice] if service_type != "hana": print(f"{Colors.RED}✗ {name} activation is not yet implemented{Colors.ENDC}") print(f"{Colors.DIM}Only SAP HANA activation is currently supported{Colors.ENDC}") continue return service_type print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def get_server_hostname(self) -> str: """Get server hostname from user""" print(f"\n{Colors.BOLD}🖥️ Server Configuration{Colors.ENDC}") hostname = input(f"{Colors.BOLD}Server hostname/IP (default: server): {Colors.ENDC}").strip() return hostname if hostname else "server" def mount_nfs_dataset(self, server: str, dataset_path: str) -> Tuple[bool, str]: """Mount NFS dataset from server""" dataset_name = dataset_path.split('/')[-1] drill_dataset = f"{self.clone_prefix}{dataset_name}" # Use full pool path for NFS mount pool_part = '/'.join(dataset_path.split('/')[:-1]) # Get pool part full_drill_path = f"{pool_part}/{drill_dataset}" if pool_part else drill_dataset local_mount = "/hana" server_path = f"/{full_drill_path}" # Create mount point if it doesn't exist os.makedirs(local_mount, exist_ok=True) # Check if already mounted success, output = self.run_command(f"mount | grep {local_mount}") if success and local_mount in output: # Unmount first self.run_command(f"umount {local_mount}") # Mount NFS mount_cmd = f"mount -t nfs {server}:{server_path} {local_mount}" success, output = self.run_command(mount_cmd) if success: return True, f"Mounted {server}:{server_path} to {local_mount}" else: return False, f"Failed to mount NFS: {output}" def check_hana_status(self) -> Tuple[bool, str]: """Check HANA database status""" success, output = self.run_command("HDB info", as_user="prcadm", show_output=True) if success and "hdbnameserver" in output.lower(): return True, "HANA is running" else: return False, "HANA is not running or has issues" def activate_hana_service(self, server: str, dataset_path: str): """Complete HANA activation workflow""" print(f"\n{Colors.BOLD}🚀 SAP HANA Activation{Colors.ENDC}") # Mount NFS dataset print(f"{Colors.YELLOW}Mounting drill dataset from server...{Colors.ENDC}") self.show_loading("Connecting to NFS server") success, message = self.mount_nfs_dataset(server, dataset_path) if not success: print(f"{Colors.RED}✗ {message}{Colors.ENDC}") return print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") # Unregister from system replication print(f"\n{Colors.YELLOW}Unregistering from system replication...{Colors.ENDC}") self.show_loading("Unregistering", 4.0) success, output = self.run_command("hdbnsutil -sr_unregister", as_user="prcadm", show_output=True) if success: print(f"{Colors.GREEN}✓ Successfully unregistered from system replication{Colors.ENDC}") else: print(f"{Colors.YELLOW}⚠ Unregister command completed with warnings{Colors.ENDC}") # Start HANA database print(f"\n{Colors.YELLOW}Starting HANA database...{Colors.ENDC}") self.show_loading("Starting database", 8.0) success, output = self.run_command("HDB start", as_user="prcadm", show_output=True) if success: print(f"{Colors.GREEN}✓ HANA database started successfully{Colors.ENDC}") else: print(f"{Colors.RED}✗ Failed to start HANA database{Colors.ENDC}") return # Check database health print(f"\n{Colors.YELLOW}Checking database health...{Colors.ENDC}") self.show_loading("Verifying status", 3.0) # Wait a moment for services to fully start time.sleep(2) success, message = self.check_hana_status() if success: print(f"{Colors.GREEN}✓ {message} - All systems operational!{Colors.ENDC}") print(f"\n{Colors.GREEN}🎉 HANA Activation Drill Complete!{Colors.ENDC}") print(f"{Colors.DIM}Database is ready for testing on /hana{Colors.ENDC}") else: print(f"{Colors.YELLOW}⚠ Database started but health check inconclusive{Colors.ENDC}") print(f"{Colors.DIM}Please verify manually: su - prcadm -c 'HDB info'{Colors.ENDC}") def run_activation_drill(self): """Main activation drill workflow""" print(f"\n{Colors.BOLD}🔄 Starting Client Activation{Colors.ENDC}") # Get dataset path dataset = self.get_dataset_input() if not dataset: return # Select service type service_type = self.select_service_type() if not service_type: return # Get server hostname server = self.get_server_hostname() # Execute activation based on service type if service_type == "hana": self.activate_hana_service(server, dataset) else: print(f"{Colors.RED}✗ Service type not implemented yet{Colors.ENDC}") def run(self): """Main application loop""" try: os.system('clear' if os.name == 'posix' else 'cls') self.display_banner() self.run_activation_drill() except KeyboardInterrupt: print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}") sys.exit(0) if __name__ == "__main__": # Check if running as root if os.geteuid() != 0: print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") print(f"{Colors.DIM}NFS mounting and service management may require sudo{Colors.ENDC}") client = ActivationDrillClient() client.run()