Compare commits

...

3 Commits

Author SHA1 Message Date
sBubshait
eb8e078a29 feat: smarter date choosing and fixing nfs mount path 2025-08-05 15:56:42 +03:00
sBubshait
bcc44e8089 fix: incorrect parsing of dates in the snapshot naming 2025-08-05 15:51:04 +03:00
sBubshait
1831be5658 feat: base drill server script 2025-08-05 15:40:38 +03:00

View File

@ -0,0 +1,432 @@
#!/usr/bin/env python3
"""
PDG Activation Drill Server
Manages snapshot cloning and NFS exports for disaster recovery testing.
"""
import os
import sys
import subprocess
import time
import re
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
class Colors:
"""ANSI color codes for terminal styling"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
DIM = '\033[2m'
class ActivationDrillServer:
def __init__(self):
self.clone_prefix = "PDGdrill-"
self.nfs_exports_file = "/etc/exports"
def display_banner(self):
"""Display the Activation Drill Server banner"""
banner = f"""
{Colors.CYAN}{Colors.BOLD}
{Colors.ENDC}
{Colors.DIM} Drill Activation (Server) {Colors.ENDC}
"""
print(banner)
def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]:
"""Execute shell command and return success status and output"""
try:
result = subprocess.run(
cmd, shell=True, capture_output=capture_output,
text=True, check=False
)
return result.returncode == 0, result.stdout + result.stderr
except Exception as e:
return False, str(e)
def get_zfs_datasets(self) -> List[str]:
"""Retrieve available ZFS datasets"""
success, output = self.run_command("zfs list -H -o name -t filesystem")
if not success:
return []
return [line.strip() for line in output.split('\n') if line.strip()]
def get_dataset_snapshots(self, dataset: str) -> List[Dict]:
"""Get snapshots for a dataset that match PDG naming convention"""
dataset_basename = dataset.split('/')[-1]
expected_prefix = f"{dataset.replace('/', '-')}-"
success, output = self.run_command(f"zfs list -H -o name,creation -t snapshot | grep '^{dataset}@'")
if not success:
return []
snapshots = []
for line in output.split('\n'):
if line.strip():
parts = line.split('\t')
if len(parts) >= 2:
snap_name = parts[0].strip()
creation = parts[1].strip()
# Extract snapshot suffix (after @)
snap_suffix = snap_name.split('@')[1]
# Only include snapshots that match PDG naming convention
if snap_suffix.startswith(expected_prefix):
snapshots.append({
'name': snap_name,
'suffix': snap_suffix,
'creation': creation
})
# Sort by creation time (newest first)
snapshots.sort(key=lambda x: x['creation'], reverse=True)
return snapshots
def show_loading(self, message: str, duration: float = 2.0):
"""Display a loading animation"""
chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
end_time = time.time() + duration
while time.time() < end_time:
for char in chars:
print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True)
time.sleep(0.1)
if time.time() >= end_time:
break
print(f"\r{' ' * (len(message) + 2)}\r", end="")
def select_dataset(self) -> Optional[str]:
"""Interactive dataset selection"""
datasets = self.get_zfs_datasets()
if not datasets:
print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}")
return None
# Filter out PDGdrill clones from the list
datasets = [d for d in datasets if not d.split('/')[-1].startswith(self.clone_prefix)]
if len(datasets) > 100:
print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}")
print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}")
while True:
dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip()
if not dataset:
return None
success, _ = self.run_command(f"zfs list {dataset}")
if success:
return dataset
else:
print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}")
retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower()
if retry != 'y':
return None
else:
print(f"\n{Colors.BOLD}Available Services (Datasets):{Colors.ENDC}")
for i, dataset in enumerate(datasets, 1):
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}")
while True:
try:
choice = input(f"\n{Colors.BOLD}Select service (1-{len(datasets)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(datasets):
return datasets[idx]
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def select_snapshot_by_date(self, dataset: str) -> Optional[str]:
"""Hierarchical snapshot selection by year/month/day"""
snapshots = self.get_dataset_snapshots(dataset)
if not snapshots:
print(f"{Colors.RED}✗ No PDG snapshots found for {dataset}{Colors.ENDC}")
return None
# Parse all snapshots and group hierarchically
parsed_snapshots = []
for snap in snapshots:
try:
suffix = snap['suffix']
import re
date_pattern = r'(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})'
match = re.search(date_pattern, suffix)
if match:
year, month, day, hour, minute, second = match.groups()
parsed_snapshots.append({
**snap,
'year': year,
'month': month,
'day': day,
'time': f"{hour}:{minute}:{second}",
'full_date': f"{year}-{month}-{day}"
})
except Exception:
continue
if not parsed_snapshots:
print(f"{Colors.RED}✗ No properly formatted PDG snapshots found{Colors.ENDC}")
return None
# Group by year, month, day hierarchically
years = {}
for snap in parsed_snapshots:
year = snap['year']
month = snap['month']
day = snap['day']
if year not in years:
years[year] = {}
if month not in years[year]:
years[year][month] = {}
if day not in years[year][month]:
years[year][month][day] = []
years[year][month][day].append(snap)
# Hierarchical selection
selected_year = self._select_year(years) if len(years) > 1 else list(years.keys())[0]
if not selected_year:
return None
selected_month = self._select_month(years[selected_year], selected_year) if len(years[selected_year]) > 1 else list(years[selected_year].keys())[0]
if not selected_month:
return None
selected_day = self._select_day(years[selected_year][selected_month], selected_year, selected_month) if len(years[selected_year][selected_month]) > 1 else list(years[selected_year][selected_month].keys())[0]
if not selected_day:
return None
# Select specific snapshot from the chosen day
day_snapshots = years[selected_year][selected_month][selected_day]
return self._select_time(day_snapshots, selected_year, selected_month, selected_day)
def _select_year(self, years: Dict) -> Optional[str]:
"""Select year from available years"""
year_list = sorted(years.keys(), reverse=True)
print(f"\n{Colors.BOLD}📅 Year Selection{Colors.ENDC}")
for i, year in enumerate(year_list, 1):
total_snapshots = sum(len(day_snaps) for month in years[year].values() for day_snaps in month.values())
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {year} ({total_snapshots} snapshots)")
while True:
try:
choice = input(f"\n{Colors.BOLD}Select year (1-{len(year_list)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(year_list):
return year_list[idx]
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def _select_month(self, months: Dict, year: str) -> Optional[str]:
"""Select month from available months"""
month_list = sorted(months.keys(), reverse=True)
month_names = ["", "January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"]
print(f"\n{Colors.BOLD}📅 Month Selection for {year}{Colors.ENDC}")
for i, month in enumerate(month_list, 1):
total_snapshots = sum(len(day_snaps) for day_snaps in months[month].values())
month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {month_name} ({total_snapshots} snapshots)")
while True:
try:
choice = input(f"\n{Colors.BOLD}Select month (1-{len(month_list)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(month_list):
return month_list[idx]
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def _select_day(self, days: Dict, year: str, month: str) -> Optional[str]:
"""Select day from available days"""
day_list = sorted(days.keys(), reverse=True)
month_names = ["", "January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"]
month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month
print(f"\n{Colors.BOLD}📅 Day Selection for {month_name} {year}{Colors.ENDC}")
for i, day in enumerate(day_list, 1):
snapshot_count = len(days[day])
try:
from datetime import datetime
date_obj = datetime(int(year), int(month), int(day))
day_name = date_obj.strftime('%A')
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {day_name}, {month_name} {day} ({snapshot_count} snapshots)")
except:
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} Day {day} ({snapshot_count} snapshots)")
while True:
try:
choice = input(f"\n{Colors.BOLD}Select day (1-{len(day_list)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(day_list):
return day_list[idx]
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def _select_time(self, snapshots: List[Dict], year: str, month: str, day: str) -> Optional[str]:
"""Select specific snapshot by time"""
month_names = ["", "January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December"]
month_name = month_names[int(month)] if 1 <= int(month) <= 12 else month
# Sort by time (newest first)
snapshots.sort(key=lambda x: x['time'], reverse=True)
print(f"\n{Colors.BOLD}🕐 Time Selection for {month_name} {day}, {year}{Colors.ENDC}")
for i, snap in enumerate(snapshots, 1):
creation_parts = snap['creation'].split()
if len(creation_parts) >= 4:
creation_display = f"{creation_parts[1]} {creation_parts[2]} {creation_parts[3]}"
else:
creation_display = snap['creation']
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {snap['time']} {Colors.DIM}(created: {creation_display}){Colors.ENDC}")
while True:
try:
choice = input(f"\n{Colors.BOLD}Select snapshot (1-{len(snapshots)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(snapshots):
selected_snap = snapshots[idx]
# Confirm selection
print(f"\n{Colors.YELLOW}Selected: {month_name} {day}, {year} at {selected_snap['time']}{Colors.ENDC}")
print(f"{Colors.DIM}Snapshot: {selected_snap['name']}{Colors.ENDC}")
confirm = input(f"{Colors.BOLD}Proceed with this snapshot? (y/N): {Colors.ENDC}")
if confirm.lower() == 'y':
return selected_snap['name']
else:
return self.select_snapshot_by_date(selected_snap['name'].split('@')[0]) # Start over
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def clone_snapshot(self, snapshot: str, dataset: str) -> Tuple[bool, str]:
"""Clone snapshot to PDGdrill naming"""
dataset_name = dataset.split('/')[-1]
clone_name = f"{'/'.join(dataset.split('/')[:-1])}/{self.clone_prefix}{dataset_name}" if '/' in dataset else f"{self.clone_prefix}{dataset_name}"
# Check if clone already exists and destroy it
success, _ = self.run_command(f"zfs list {clone_name}")
if success:
print(f"{Colors.YELLOW}Existing drill clone found, removing...{Colors.ENDC}")
destroy_success, destroy_msg = self.run_command(f"zfs destroy {clone_name}")
if not destroy_success:
return False, f"Failed to remove existing clone: {destroy_msg}"
# Create new clone
success, output = self.run_command(f"zfs clone {snapshot} {clone_name}")
if success:
return True, f"Clone created: {clone_name}"
else:
return False, f"Failed to create clone: {output}"
def update_nfs_exports(self, clone_name: str) -> Tuple[bool, str]:
"""Add clone to NFS exports if not already present"""
# Use full pool path for NFS export
export_path = f"/{clone_name}"
# Check if clone is already in exports
if os.path.exists(self.nfs_exports_file):
with open(self.nfs_exports_file, 'r') as f:
content = f.read()
if export_path in content:
return True, "NFS export already exists"
# Add to exports with full path
export_line = f"{export_path} *(rw,sync,no_root_squash,no_subtree_check)\n"
try:
with open(self.nfs_exports_file, 'a') as f:
f.write(export_line)
# Reload NFS exports
success, output = self.run_command("exportfs -ra")
if success:
return True, f"NFS export added: {export_path}"
else:
return False, f"Failed to reload NFS exports: {output}"
except Exception as e:
return False, f"Failed to update exports file: {str(e)}"
def run_activation_drill(self):
"""Main activation drill workflow"""
print(f"\n{Colors.BOLD}🔄 Starting Activation Drill{Colors.ENDC}")
# Select service/dataset
dataset = self.select_dataset()
if not dataset:
return
# Select snapshot
snapshot = self.select_snapshot_by_date(dataset)
if not snapshot:
return
print(f"\n{Colors.YELLOW}Preparing activation drill...{Colors.ENDC}")
# Clone snapshot
self.show_loading("Creating drill clone")
success, message = self.clone_snapshot(snapshot, dataset)
if not success:
print(f"{Colors.RED}{message}{Colors.ENDC}")
return
clone_name = message.split(': ')[1] # Extract clone name from success message
print(f"{Colors.GREEN}{message}{Colors.ENDC}")
# Update NFS exports
self.show_loading("Updating NFS exports")
success, message = self.update_nfs_exports(clone_name)
if success:
print(f"{Colors.GREEN}{message}{Colors.ENDC}")
else:
print(f"{Colors.YELLOW}{message}{Colors.ENDC}")
print(f"\n{Colors.GREEN}🎉 Activation drill ready!{Colors.ENDC}")
print(f"{Colors.DIM}Clone: {clone_name}{Colors.ENDC}")
print(f"{Colors.DIM}Clients can now connect and mount the drill dataset{Colors.ENDC}")
def run(self):
"""Main application loop"""
try:
os.system('clear' if os.name == 'posix' else 'cls')
self.display_banner()
self.run_activation_drill()
except KeyboardInterrupt:
print(f"\n\n{Colors.DIM}Operation cancelled{Colors.ENDC}")
sys.exit(0)
if __name__ == "__main__":
# Check if running as root
if os.geteuid() != 0:
print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}")
print(f"{Colors.DIM}NFS export updates may require sudo{Colors.ENDC}")
server = ActivationDrillServer()
server.run()