feat: base drill server script

This commit is contained in:
sBubshait 2025-08-05 15:40:38 +03:00
parent 693fe005d3
commit 1831be5658

View File

@ -0,0 +1,329 @@
#!/usr/bin/env python3
"""
PDG Activation Drill Server
Manages snapshot cloning and NFS exports for disaster recovery testing.
"""
import os
import sys
import subprocess
import time
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
class Colors:
"""ANSI color codes for terminal styling"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
DIM = '\033[2m'
class ActivationDrillServer:
def __init__(self):
self.clone_prefix = "PDGdrill-"
self.nfs_exports_file = "/etc/exports"
def display_banner(self):
"""Display the Activation Drill Server banner"""
banner = f"""
{Colors.CYAN}{Colors.BOLD}
{Colors.ENDC}
{Colors.DIM} Drill Activation (Server) {Colors.ENDC}
"""
print(banner)
def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]:
"""Execute shell command and return success status and output"""
try:
result = subprocess.run(
cmd, shell=True, capture_output=capture_output,
text=True, check=False
)
return result.returncode == 0, result.stdout + result.stderr
except Exception as e:
return False, str(e)
def get_zfs_datasets(self) -> List[str]:
"""Retrieve available ZFS datasets"""
success, output = self.run_command("zfs list -H -o name -t filesystem")
if not success:
return []
return [line.strip() for line in output.split('\n') if line.strip()]
def get_dataset_snapshots(self, dataset: str) -> List[Dict]:
"""Get all snapshots for a dataset with timestamps"""
success, output = self.run_command(f"zfs list -H -o name,creation -t snapshot | grep '^{dataset}@'")
if not success:
return []
snapshots = []
for line in output.split('\n'):
if line.strip():
parts = line.split('\t')
if len(parts) >= 2:
snap_name = parts[0].strip()
creation = parts[1].strip()
# Extract timestamp from snapshot name (assuming default format)
snap_suffix = snap_name.split('@')[1]
snapshots.append({
'name': snap_name,
'suffix': snap_suffix,
'creation': creation
})
# Sort by creation time (newest first)
snapshots.sort(key=lambda x: x['creation'], reverse=True)
return snapshots
def show_loading(self, message: str, duration: float = 2.0):
"""Display a loading animation"""
chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
end_time = time.time() + duration
while time.time() < end_time:
for char in chars:
print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True)
time.sleep(0.1)
if time.time() >= end_time:
break
print(f"\r{' ' * (len(message) + 2)}\r", end="")
def select_dataset(self) -> Optional[str]:
"""Interactive dataset selection"""
datasets = self.get_zfs_datasets()
if not datasets:
print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}")
return None
# Filter out PDGdrill clones from the list
datasets = [d for d in datasets if not d.split('/')[-1].startswith(self.clone_prefix)]
if len(datasets) > 100:
print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}")
print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}")
while True:
dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip()
if not dataset:
return None
success, _ = self.run_command(f"zfs list {dataset}")
if success:
return dataset
else:
print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}")
retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower()
if retry != 'y':
return None
else:
print(f"\n{Colors.BOLD}Available Services (Datasets):{Colors.ENDC}")
for i, dataset in enumerate(datasets, 1):
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}")
while True:
try:
choice = input(f"\n{Colors.BOLD}Select service (1-{len(datasets)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(datasets):
return datasets[idx]
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def select_snapshot_by_date(self, dataset: str) -> Optional[str]:
"""Interactive snapshot selection by date"""
snapshots = self.get_dataset_snapshots(dataset)
if not snapshots:
print(f"{Colors.RED}✗ No snapshots found for {dataset}{Colors.ENDC}")
return None
# Group snapshots by date
date_groups = {}
for snap in snapshots:
# Extract date from snapshot suffix (assuming YYYY-MM-DD_HH-MM-SS format)
try:
date_part = snap['suffix'].split('_')[0] if '_' in snap['suffix'] else snap['suffix']
if len(date_part) >= 10: # YYYY-MM-DD
date_key = date_part[:10]
if date_key not in date_groups:
date_groups[date_key] = []
date_groups[date_key].append(snap)
except:
continue
if not date_groups:
print(f"{Colors.RED}✗ No properly formatted snapshots found{Colors.ENDC}")
return None
dates = sorted(date_groups.keys(), reverse=True)
earliest_date = min(dates)
latest_date = max(dates)
print(f"\n{Colors.BOLD}📅 Snapshot Date Selection{Colors.ENDC}")
print(f"{Colors.DIM}({len(snapshots)}) Snapshots available from {earliest_date} to {latest_date}{Colors.ENDC}")
# Show available dates
print(f"\n{Colors.BOLD}Available Dates:{Colors.ENDC}")
for i, date in enumerate(dates, 1):
count = len(date_groups[date])
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {date} ({count} snapshot{'s' if count > 1 else ''})")
# Select date
while True:
try:
choice = input(f"\n{Colors.BOLD}Select date (1-{len(dates)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(dates):
selected_date = dates[idx]
break
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
# Show snapshots for selected date
day_snapshots = date_groups[selected_date]
print(f"\n{Colors.BOLD}Snapshots for {selected_date}:{Colors.ENDC}")
for i, snap in enumerate(day_snapshots, 1):
time_part = "Unknown"
if '_' in snap['suffix']:
time_part = snap['suffix'].split('_')[1].replace('-', ':')
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {time_part} - {snap['creation']}")
# Select specific snapshot
while True:
try:
choice = input(f"\n{Colors.BOLD}Select snapshot (1-{len(day_snapshots)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(day_snapshots):
selected_snap = day_snapshots[idx]['name']
# Confirm selection
print(f"\n{Colors.YELLOW}Selected snapshot: {selected_snap}{Colors.ENDC}")
confirm = input(f"{Colors.BOLD}Proceed with this snapshot? (y/N): {Colors.ENDC}")
if confirm.lower() == 'y':
return selected_snap
else:
return self.select_snapshot_by_date(dataset) # Start over
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def clone_snapshot(self, snapshot: str, dataset: str) -> Tuple[bool, str]:
"""Clone snapshot to PDGdrill naming"""
dataset_name = dataset.split('/')[-1]
clone_name = f"{'/'.join(dataset.split('/')[:-1])}/{self.clone_prefix}{dataset_name}" if '/' in dataset else f"{self.clone_prefix}{dataset_name}"
# Check if clone already exists and destroy it
success, _ = self.run_command(f"zfs list {clone_name}")
if success:
print(f"{Colors.YELLOW}Existing drill clone found, removing...{Colors.ENDC}")
destroy_success, destroy_msg = self.run_command(f"zfs destroy {clone_name}")
if not destroy_success:
return False, f"Failed to remove existing clone: {destroy_msg}"
# Create new clone
success, output = self.run_command(f"zfs clone {snapshot} {clone_name}")
if success:
return True, f"Clone created: {clone_name}"
else:
return False, f"Failed to create clone: {output}"
def update_nfs_exports(self, clone_name: str) -> Tuple[bool, str]:
"""Add clone to NFS exports if not already present"""
# Check if clone is already in exports
if os.path.exists(self.nfs_exports_file):
with open(self.nfs_exports_file, 'r') as f:
content = f.read()
if clone_name in content:
return True, "NFS export already exists"
# Add to exports
mount_point = f"/{clone_name.split('/')[-1]}"
export_line = f"{mount_point} *(rw,sync,no_root_squash,no_subtree_check)\n"
try:
with open(self.nfs_exports_file, 'a') as f:
f.write(export_line)
# Reload NFS exports
success, output = self.run_command("exportfs -ra")
if success:
return True, f"NFS export added: {mount_point}"
else:
return False, f"Failed to reload NFS exports: {output}"
except Exception as e:
return False, f"Failed to update exports file: {str(e)}"
def run_activation_drill(self):
"""Main activation drill workflow"""
print(f"\n{Colors.BOLD}🔄 Starting Activation Drill{Colors.ENDC}")
# Select service/dataset
dataset = self.select_dataset()
if not dataset:
return
# Select snapshot
snapshot = self.select_snapshot_by_date(dataset)
if not snapshot:
return
print(f"\n{Colors.YELLOW}Preparing activation drill...{Colors.ENDC}")
# Clone snapshot
self.show_loading("Creating drill clone")
success, message = self.clone_snapshot(snapshot, dataset)
if not success:
print(f"{Colors.RED}{message}{Colors.ENDC}")
return
clone_name = message.split(': ')[1] # Extract clone name from success message
print(f"{Colors.GREEN}{message}{Colors.ENDC}")
# Update NFS exports
self.show_loading("Updating NFS exports")
success, message = self.update_nfs_exports(clone_name)
if success:
print(f"{Colors.GREEN}{message}{Colors.ENDC}")
else:
print(f"{Colors.YELLOW}{message}{Colors.ENDC}")
print(f"\n{Colors.GREEN}🎉 Activation drill ready!{Colors.ENDC}")
print(f"{Colors.DIM}Clone: {clone_name}{Colors.ENDC}")
print(f"{Colors.DIM}Clients can now connect and mount the drill dataset{Colors.ENDC}")
def run(self):
"""Main application loop"""
try:
os.system('clear' if os.name == 'posix' else 'cls')
self.display_banner()
self.run_activation_drill()
except KeyboardInterrupt:
print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}")
sys.exit(0)
if __name__ == "__main__":
# Check if running as root
if os.geteuid() != 0:
print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}")
print(f"{Colors.DIM}NFS export updates may require sudo{Colors.ENDC}")
server = ActivationDrillServer()
server.run()