PDG/activationDrill/drillClient.py

261 lines
10 KiB
Python

#!/usr/bin/env python3
"""
PDG Activation Drill Client
Connects to server and activates services from snapshot clones.
"""
import os
import sys
import subprocess
import time
from typing import List, Dict, Optional, Tuple
class Colors:
"""ANSI color codes for terminal styling"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
DIM = '\033[2m'
class ActivationDrillClient:
def __init__(self):
self.clone_prefix = "PDGdrill-"
self.service_types = {
"1": ("SAP HANA", "hana"),
"2": ("SAP Application", "sap_app"),
"3": ("Custom", "custom")
}
def display_banner(self):
"""Display the Activation Drill Client banner"""
banner = f"""
{Colors.CYAN}{Colors.BOLD}
██████╗ ██████╗ ██████╗
██╔══██╗██╔══██╗██╔════╝
██████╔╝██║ ██║██║ ███╗
██╔═══╝ ██║ ██║██║ ██║
██║ ██████╔╝╚██████╔╝
╚═╝ ╚═════╝ ╚═════╝
{Colors.ENDC}
{Colors.DIM} │ Drill Activation (Client) │{Colors.ENDC}
"""
print(banner)
def run_command(self, cmd: str, capture_output: bool = True, as_user: str = None) -> Tuple[bool, str]:
"""Execute shell command and return success status and output"""
try:
if as_user:
cmd = f"su - {as_user} -c '{cmd}'"
if capture_output:
result = subprocess.run(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, check=False
)
return result.returncode == 0, result.stdout + result.stderr
else:
result = subprocess.run(cmd, shell=True, check=False)
return result.returncode == 0, ""
except Exception as e:
return False, str(e)
def show_loading(self, message: str, duration: float = 3.0):
"""Display a loading animation"""
chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
end_time = time.time() + duration
while time.time() < end_time:
for char in chars:
print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True)
time.sleep(0.1)
if time.time() >= end_time:
break
print(f"\r{' ' * (len(message) + 2)}\r", end="")
def get_dataset_input(self) -> Optional[str]:
"""Get dataset name from user input"""
print(f"\n{Colors.BOLD}📊 Service Dataset Selection{Colors.ENDC}")
print(f"{Colors.DIM}Enter the full dataset path including pool{Colors.ENDC}")
print(f"{Colors.DIM}Example: pool1/productionHanaDB{Colors.ENDC}")
while True:
dataset = input(f"\n{Colors.BOLD}Dataset path: {Colors.ENDC}").strip()
if not dataset:
return None
if '/' not in dataset:
print(f"{Colors.RED}✗ Please include pool name (e.g., pool1/datasetname){Colors.ENDC}")
continue
return dataset
def select_service_type(self) -> Optional[str]:
"""Select activation service type"""
print(f"\n{Colors.BOLD}🔧 Activation Type Selection{Colors.ENDC}")
for key, (name, _) in self.service_types.items():
if key == "1":
print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name} {Colors.GREEN}[Available]{Colors.ENDC}")
else:
print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name} {Colors.RED}[Not Implemented]{Colors.ENDC}")
while True:
choice = input(f"\n{Colors.BOLD}Select activation type (1-3): {Colors.ENDC}")
if choice in self.service_types:
name, service_type = self.service_types[choice]
if service_type != "hana":
print(f"{Colors.RED}{name} activation is not yet implemented{Colors.ENDC}")
print(f"{Colors.DIM}Only SAP HANA activation is currently supported{Colors.ENDC}")
continue
return service_type
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
def get_server_hostname(self) -> str:
"""Get server hostname from user"""
print(f"\n{Colors.BOLD}🖥️ Server Configuration{Colors.ENDC}")
hostname = input(f"{Colors.BOLD}Server hostname/IP (default: server): {Colors.ENDC}").strip()
return hostname if hostname else "server"
def mount_nfs_dataset(self, server: str, dataset_path: str) -> Tuple[bool, str]:
"""Mount NFS dataset from server"""
dataset_name = dataset_path.split('/')[-1]
drill_dataset = f"{self.clone_prefix}{dataset_name}"
# Use full pool path for NFS mount
pool_part = '/'.join(dataset_path.split('/')[:-1]) # Get pool part
full_drill_path = f"{pool_part}/{drill_dataset}" if pool_part else drill_dataset
local_mount = "/hana"
server_path = f"/{full_drill_path}"
# Create mount point if it doesn't exist
os.makedirs(local_mount, exist_ok=True)
# Check if already mounted
success, output = self.run_command(f"mount | grep {local_mount}")
if success and local_mount in output:
# Unmount first
self.run_command(f"umount {local_mount}")
# Mount NFS
mount_cmd = f"mount -t nfs {server}:{server_path} {local_mount}"
success, output = self.run_command(mount_cmd)
if success:
return True, f"Mounted {server}:{server_path} to {local_mount}"
else:
return False, f"Failed to mount NFS: {output}"
def check_hana_status(self) -> Tuple[bool, str]:
"""Check HANA database status"""
success, output = self.run_command("HDB info", as_user="prcadm")
if success and "hdbnameserver" in output.lower():
return True, "HANA is running"
else:
return False, "HANA is not running or has issues"
def activate_hana_service(self, server: str, dataset_path: str):
"""Complete HANA activation workflow"""
print(f"\n{Colors.BOLD}🚀 SAP HANA Activation{Colors.ENDC}")
# Mount NFS dataset
print(f"{Colors.YELLOW}Mounting drill dataset from server...{Colors.ENDC}")
self.show_loading("Connecting to NFS server")
success, message = self.mount_nfs_dataset(server, dataset_path)
if not success:
print(f"{Colors.RED}{message}{Colors.ENDC}")
return
print(f"{Colors.GREEN}{message}{Colors.ENDC}")
# Unregister from system replication
print(f"\n{Colors.YELLOW}Unregistering from system replication...{Colors.ENDC}")
self.show_loading("Unregistering", 4.0)
success, output = self.run_command("hdbnsutil -sr_unregister", as_user="prcadm")
if success:
print(f"{Colors.GREEN}✓ Successfully unregistered from system replication{Colors.ENDC}")
else:
print(f"{Colors.YELLOW}⚠ Unregister command completed with warnings{Colors.ENDC}")
print(f"{Colors.DIM}{output[:200]}...{Colors.ENDC}")
# Start HANA database
print(f"\n{Colors.YELLOW}Starting HANA database...{Colors.ENDC}")
self.show_loading("Starting database", 8.0)
success, output = self.run_command("HDB start", as_user="prcadm")
if success:
print(f"{Colors.GREEN}✓ HANA database started successfully{Colors.ENDC}")
else:
print(f"{Colors.RED}✗ Failed to start HANA database{Colors.ENDC}")
print(f"{Colors.RED}{output[:300]}{Colors.ENDC}")
return
# Check database health
print(f"\n{Colors.YELLOW}Checking database health...{Colors.ENDC}")
self.show_loading("Verifying status", 3.0)
# Wait a moment for services to fully start
time.sleep(2)
success, message = self.check_hana_status()
if success:
print(f"{Colors.GREEN}{message} - All systems operational!{Colors.ENDC}")
print(f"\n{Colors.GREEN}🎉 HANA Activation Drill Complete!{Colors.ENDC}")
print(f"{Colors.DIM}Database is ready for testing on /hana{Colors.ENDC}")
else:
print(f"{Colors.YELLOW}⚠ Database started but health check inconclusive{Colors.ENDC}")
print(f"{Colors.DIM}Please verify manually: su - prcadm -c 'HDB info'{Colors.ENDC}")
def run_activation_drill(self):
"""Main activation drill workflow"""
print(f"\n{Colors.BOLD}🔄 Starting Client Activation{Colors.ENDC}")
# Get dataset path
dataset = self.get_dataset_input()
if not dataset:
return
# Select service type
service_type = self.select_service_type()
if not service_type:
return
# Get server hostname
server = self.get_server_hostname()
# Execute activation based on service type
if service_type == "hana":
self.activate_hana_service(server, dataset)
else:
print(f"{Colors.RED}✗ Service type not implemented yet{Colors.ENDC}")
def run(self):
"""Main application loop"""
try:
os.system('clear' if os.name == 'posix' else 'cls')
self.display_banner()
self.run_activation_drill()
except KeyboardInterrupt:
print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}")
sys.exit(0)
if __name__ == "__main__":
# Check if running as root
if os.geteuid() != 0:
print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}")
print(f"{Colors.DIM}NFS mounting and service management may require sudo{Colors.ENDC}")
client = ActivationDrillClient()
client.run()