#!/usr/bin/env python3 """ PDG - ZFS Snapshot Automation Tool A modern CLI interface for managing automated ZFS snapshots via cron jobs. """ import os import sys import subprocess import time import re from datetime import datetime from typing import List, Dict, Optional, Tuple class Colors: """ANSI color codes for terminal styling""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' class PDGSnapshot: def __init__(self): self.cron_prefix = "PDGsnapshot-" self.schedule_options = { "1": ("hourly", "0 * * * *"), "2": ("daily", "0 2 * * *"), "3": ("weekly", "0 2 * * 0"), "4": ("custom", None) } def display_banner(self): """Display the PDG ASCII banner""" banner = f""" {Colors.CYAN}{Colors.BOLD} ██████╗ ██████╗ ██████╗ ██╔══██╗██╔══██╗██╔════╝ ██████╔╝██║ ██║██║ ███╗ ██╔═══╝ ██║ ██║██║ ██║ ██║ ██████╔╝╚██████╔╝ ╚═╝ ╚═════╝ ╚═════╝ {Colors.ENDC} {Colors.DIM} │ Snapshot Automation │{Colors.ENDC} """ print(banner) def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: """Execute shell command and return success status and output""" try: result = subprocess.run( cmd, shell=True, capture_output=capture_output, text=True, check=False ) return result.returncode == 0, result.stdout + result.stderr except Exception as e: return False, str(e) def get_zfs_datasets(self) -> List[str]: """Retrieve available ZFS datasets""" success, output = self.run_command("zfs list -H -o name") if not success: return [] return [line.strip() for line in output.split('\n') if line.strip()] def get_existing_automations(self) -> Dict[str, Dict]: """Get all PDG snapshot automations from crontab""" success, output = self.run_command("crontab -l 2>/dev/null") automations = {} if success: for line in output.split('\n'): if self.cron_prefix in line and '#' in line: parts = line.split('#', 1) if len(parts) == 2: cron_schedule = parts[0].strip() comment = parts[1].strip() if comment.startswith(self.cron_prefix): dataset = comment.replace(self.cron_prefix, '') automations[dataset] = { 'schedule': cron_schedule, 'status': 'healthy' } return automations def check_automation_health(self, dataset: str) -> Tuple[str, str]: """Check if automation is running properly""" # Check if dataset exists success, _ = self.run_command(f"zfs list {dataset}") if not success: return "error", f"Dataset '{dataset}' not found" # Check crontab entry automations = self.get_existing_automations() if dataset not in automations: return "error", "Cron job not found" return "healthy", "Running normally" def show_loading(self, message: str, duration: float = 2.0): """Display a loading animation""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration while time.time() < end_time: for char in chars: print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) time.sleep(0.1) if time.time() >= end_time: break print(f"\r{' ' * (len(message) + 2)}\r", end="") def create_snapshot_command(self, dataset: str, name_format: str) -> str: """Generate the snapshot command for cron""" # Replace placeholders in name format timestamp_cmd = "$(date +'%Y-%m-%d_%H-%M-%S')" snapshot_name = name_format.replace("{timestamp}", timestamp_cmd) snapshot_name = snapshot_name.replace("{dataset}", dataset.replace('/', '-')) return f"zfs snapshot {dataset}@{snapshot_name}" def add_cron_job(self, dataset: str, schedule: str, command: str) -> Tuple[bool, str]: """Add or update cron job for dataset""" comment = f"#{self.cron_prefix}{dataset}" cron_line = f"{schedule} {command} {comment}" # Get current crontab success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: current_cron = "" # Remove existing PDG job for this dataset lines = [] for line in current_cron.split('\n'): if line.strip() and not (self.cron_prefix in line and dataset in line): lines.append(line) # Add new job lines.append(cron_line) new_cron = '\n'.join(lines) + '\n' # Write back to crontab try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Cron job updated successfully" except Exception as e: return False, f"Failed to update crontab: {str(e)}" def select_dataset(self) -> Optional[str]: """Interactive dataset selection""" datasets = self.get_zfs_datasets() if not datasets: print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") return None print(f"\n{Colors.BOLD}Available ZFS Datasets:{Colors.ENDC}") for i, dataset in enumerate(datasets, 1): print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") while True: try: choice = input(f"\n{Colors.BOLD}Select dataset (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): return datasets[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def select_schedule(self) -> Optional[str]: """Interactive schedule selection""" print(f"\n{Colors.BOLD}Snapshot Frequency:{Colors.ENDC}") for key, (name, _) in self.schedule_options.items(): if name == "custom": print(f"{Colors.CYAN}{key}.{Colors.ENDC} Custom schedule") else: print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name.title()}") while True: choice = input(f"\n{Colors.BOLD}Select frequency (1-4): {Colors.ENDC}") if choice in self.schedule_options: name, schedule = self.schedule_options[choice] if name == "custom": print(f"\n{Colors.DIM}Format: minute hour day month weekday{Colors.ENDC}") print(f"{Colors.DIM}Example: '0 */6 * * *' for every 6 hours{Colors.ENDC}") custom = input(f"{Colors.BOLD}Enter cron schedule: {Colors.ENDC}") if custom.strip(): return custom.strip() else: return schedule print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def select_naming_format(self, dataset: str) -> str: """Interactive naming format selection""" default_format = f"{dataset.replace('/', '-')}-{{timestamp}}" print(f"\n{Colors.BOLD}Snapshot Naming:{Colors.ENDC}") print(f"{Colors.CYAN}1.{Colors.ENDC} Default: {default_format}") print(f"{Colors.CYAN}2.{Colors.ENDC} Custom format") choice = input(f"\n{Colors.BOLD}Select naming (1-2): {Colors.ENDC}") if choice == "2": print(f"\n{Colors.DIM}Use {{timestamp}} for date/time, {{dataset}} for dataset name{Colors.ENDC}") print(f"{Colors.DIM}Example: backup-{{dataset}}-{{timestamp}}{Colors.ENDC}") custom = input(f"{Colors.BOLD}Enter format: {Colors.ENDC}") if custom.strip(): return custom.strip() return default_format def create_automation(self): """Create new snapshot automation""" print(f"\n{Colors.BOLD}📸 Create New Automation{Colors.ENDC}") # Select dataset dataset = self.select_dataset() if not dataset: return # Select schedule schedule = self.select_schedule() if not schedule: return # Select naming format name_format = self.select_naming_format(dataset) # Create snapshot command command = self.create_snapshot_command(dataset, name_format) print(f"\n{Colors.YELLOW}Creating automation for {dataset}...{Colors.ENDC}") self.show_loading("Setting up cron job") # Add cron job success, message = self.add_cron_job(dataset, schedule, command) if success: print(f"{Colors.GREEN}✓ Automation created successfully!{Colors.ENDC}") print(f"{Colors.DIM}Dataset: {dataset}{Colors.ENDC}") print(f"{Colors.DIM}Schedule: {schedule}{Colors.ENDC}") else: print(f"{Colors.RED}✗ Failed to create automation{Colors.ENDC}") print(f"{Colors.RED}{message}{Colors.ENDC}") def show_status(self): """Display status of all automations""" automations = self.get_existing_automations() if not automations: print(f"\n{Colors.YELLOW}No automations found{Colors.ENDC}") return print(f"\n{Colors.BOLD}📊 Automation Status{Colors.ENDC}") for dataset, info in automations.items(): status, message = self.check_automation_health(dataset) if status == "healthy": icon = f"{Colors.GREEN}✓{Colors.ENDC}" status_text = f"{Colors.GREEN}Healthy{Colors.ENDC}" else: icon = f"{Colors.RED}✗{Colors.ENDC}" status_text = f"{Colors.RED}Error{Colors.ENDC}" print(f"\n{icon} {Colors.BOLD}{dataset}{Colors.ENDC}") print(f" Status: {status_text}") print(f" Schedule: {Colors.DIM}{info['schedule']}{Colors.ENDC}") if status == "error": print(f" Error: {Colors.RED}{message}{Colors.ENDC}") def main_menu(self): """Display main menu and handle user input""" automations = self.get_existing_automations() automation_count = len(automations) if automation_count > 0: healthy_count = sum(1 for dataset in automations if self.check_automation_health(dataset)[0] == "healthy") print(f"{Colors.GREEN}📈 {healthy_count}/{automation_count} Automations Healthy{Colors.ENDC}") print(f"\n{Colors.BOLD}Options:{Colors.ENDC}") print(f"{Colors.CYAN}1.{Colors.ENDC} View Status") print(f"{Colors.CYAN}2.{Colors.ENDC} Create New Automation") print(f"{Colors.CYAN}3.{Colors.ENDC} Exit") while True: choice = input(f"\n{Colors.BOLD}Select option (1-3): {Colors.ENDC}") if choice == "1": self.show_status() break elif choice == "2": self.create_automation() break elif choice == "3": print(f"\n{Colors.DIM}Goodbye!{Colors.ENDC}") sys.exit(0) else: print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def run(self): """Main application loop""" try: while True: os.system('clear' if os.name == 'posix' else 'cls') self.display_banner() self.main_menu() input(f"\n{Colors.DIM}Press Enter to continue...{Colors.ENDC}") except KeyboardInterrupt: print(f"\n\n{Colors.DIM}Goodbye!{Colors.ENDC}") sys.exit(0) if __name__ == "__main__": # Check if running as root (recommended for ZFS operations) if os.geteuid() != 0: print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") print(f"{Colors.DIM}Some ZFS operations may require sudo{Colors.ENDC}") app = PDGSnapshot() app.run()