#!/usr/bin/env python3 """ PDG - ZFS Snapshot Automation Tool A modern CLI interface for managing automated ZFS snapshots via cron jobs. """ import os import sys import subprocess import time import re from datetime import datetime from typing import List, Dict, Optional, Tuple class Colors: """ANSI color codes for terminal styling""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' class PDGSnapshot: def __init__(self): self.cron_prefix = "PDGsnapshot-" self.schedule_options = { "1": ("hourly", "0 * * * *"), "2": ("daily", "0 2 * * *"), "3": ("weekly", "0 2 * * 0"), "4": ("custom", None) } def display_banner(self): """Display the PDG ASCII banner""" banner = f""" {Colors.CYAN}{Colors.BOLD} ██████╗ ██████╗ ██████╗ ██╔══██╗██╔══██╗██╔════╝ ██████╔╝██║ ██║██║ ███╗ ██╔═══╝ ██║ ██║██║ ██║ ██║ ██████╔╝╚██████╔╝ ╚═╝ ╚═════╝ ╚═════╝ {Colors.ENDC} {Colors.DIM} │ Snapshot Automation │{Colors.ENDC} """ print(banner) def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: """Execute shell command and return success status and output""" try: result = subprocess.run( cmd, shell=True, capture_output=capture_output, text=True, check=False ) return result.returncode == 0, result.stdout + result.stderr except Exception as e: return False, str(e) def get_zfs_datasets(self) -> List[str]: """Retrieve available ZFS datasets""" success, output = self.run_command("zfs list -H -o name") if not success: return [] return [line.strip() for line in output.split('\n') if line.strip()] def get_existing_automations(self) -> Dict[str, Dict]: """Get all PDG snapshot automations from crontab""" success, output = self.run_command("crontab -l 2>/dev/null") automations = {} if success: for line in output.split('\n'): if self.cron_prefix in line and '#' in line: # Check if line is paused (commented out) is_paused = line.strip().startswith('#') and not line.strip().startswith('##') if is_paused: # Remove leading # to parse line = line.lstrip('#') parts = line.split('#', 1) if len(parts) == 2: cron_schedule = parts[0].strip() comment = parts[1].strip() if comment.startswith(self.cron_prefix): dataset = comment.replace(self.cron_prefix, '') automations[dataset] = { 'schedule': cron_schedule, 'status': 'paused' if is_paused else 'active' } return automations def check_automation_health(self, dataset: str) -> Tuple[str, str]: """Check if automation is running properly""" # Check if dataset exists success, _ = self.run_command(f"zfs list {dataset}") if not success: return "error", f"Dataset '{dataset}' not found" # Check crontab entry automations = self.get_existing_automations() if dataset not in automations: return "error", "Cron job not found" if automations[dataset]['status'] == 'paused': return "paused", "Automation is paused" return "healthy", "Running normally" def show_loading(self, message: str, duration: float = 2.0): """Display a loading animation""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration while time.time() < end_time: for char in chars: print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) time.sleep(0.1) if time.time() >= end_time: break print(f"\r{' ' * (len(message) + 2)}\r", end="") def create_snapshot_command(self, dataset: str, name_format: str) -> str: """Generate the snapshot command for cron""" # Replace placeholders in name format timestamp_cmd = "$(date +'%Y-%m-%d_%H-%M-%S')" snapshot_name = name_format.replace("{timestamp}", timestamp_cmd) snapshot_name = snapshot_name.replace("{dataset}", dataset.replace('/', '-')) return f"zfs snapshot {dataset}@{snapshot_name}" def add_cron_job(self, dataset: str, schedule: str, command: str) -> Tuple[bool, str]: """Add or update cron job for dataset""" comment = f"#{self.cron_prefix}{dataset}" cron_line = f"{schedule} {command} {comment}" # Get current crontab success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: current_cron = "" # Remove existing PDG job for this dataset lines = [] for line in current_cron.split('\n'): if line.strip() and not (self.cron_prefix in line and dataset in line): lines.append(line) # Add new job lines.append(cron_line) new_cron = '\n'.join(lines) + '\n' # Write back to crontab try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Cron job updated successfully" except Exception as e: return False, f"Failed to update crontab: {str(e)}" def pause_cron_job(self, dataset: str) -> Tuple[bool, str]: """Pause cron job by commenting it out""" success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: return False, "No crontab found" lines = [] job_found = False for line in current_cron.split('\n'): if line.strip() and self.cron_prefix in line and dataset in line: if not line.startswith('#'): lines.append(f"#{line}") # Comment out the line job_found = True else: lines.append(line) # Already paused job_found = True else: lines.append(line) if not job_found: return False, "Automation not found" new_cron = '\n'.join(lines) + '\n' try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Automation paused successfully" except Exception as e: return False, f"Failed to pause automation: {str(e)}" def resume_cron_job(self, dataset: str) -> Tuple[bool, str]: """Resume paused cron job by uncommenting it""" success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: return False, "No crontab found" lines = [] job_found = False for line in current_cron.split('\n'): if line.strip() and self.cron_prefix in line and dataset in line: if line.startswith('#') and not line.startswith('##'): lines.append(line[1:]) # Remove comment job_found = True else: lines.append(line) # Already active job_found = True else: lines.append(line) if not job_found: return False, "Automation not found" new_cron = '\n'.join(lines) + '\n' try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Automation resumed successfully" except Exception as e: return False, f"Failed to resume automation: {str(e)}" def delete_cron_job(self, dataset: str) -> Tuple[bool, str]: """Delete cron job for dataset""" success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: return False, "No crontab found" lines = [] job_found = False for line in current_cron.split('\n'): if line.strip() and self.cron_prefix in line and dataset in line: job_found = True # Skip this line (delete it) continue else: lines.append(line) if not job_found: return False, "Automation not found" new_cron = '\n'.join(lines) + '\n' try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Automation deleted successfully" except Exception as e: return False, f"Failed to delete automation: {str(e)}" def select_dataset(self) -> Optional[str]: """Interactive dataset selection""" datasets = self.get_zfs_datasets() if not datasets: print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") return None # If too many datasets, ask for manual input if len(datasets) > 100: print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}") print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}") while True: dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip() if not dataset: return None # Check if dataset exists success, _ = self.run_command(f"zfs list {dataset}") if success: return dataset else: print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}") retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower() if retry != 'y': return None else: # Show numbered list for reasonable amount of datasets print(f"\n{Colors.BOLD}Available ZFS Datasets:{Colors.ENDC}") for i, dataset in enumerate(datasets, 1): print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") while True: try: choice = input(f"\n{Colors.BOLD}Select dataset (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): return datasets[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def select_schedule(self) -> Optional[str]: """Interactive schedule selection""" print(f"\n{Colors.BOLD}Snapshot Frequency:{Colors.ENDC}") for key, (name, _) in self.schedule_options.items(): if name == "custom": print(f"{Colors.CYAN}{key}.{Colors.ENDC} Custom schedule") else: print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name.title()}") while True: choice = input(f"\n{Colors.BOLD}Select frequency (1-4): {Colors.ENDC}") if choice in self.schedule_options: name, schedule = self.schedule_options[choice] if name == "custom": print(f"\n{Colors.DIM}Format: minute hour day month weekday{Colors.ENDC}") print(f"{Colors.DIM}Example: '0 */6 * * *' for every 6 hours{Colors.ENDC}") custom = input(f"{Colors.BOLD}Enter cron schedule: {Colors.ENDC}") if custom.strip(): return custom.strip() else: return schedule print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def select_naming_format(self, dataset: str) -> str: """Interactive naming format selection""" default_format = f"{dataset.replace('/', '-')}-{{timestamp}}" print(f"\n{Colors.BOLD}Snapshot Naming:{Colors.ENDC}") print(f"{Colors.CYAN}1.{Colors.ENDC} Default: {default_format}") print(f"{Colors.CYAN}2.{Colors.ENDC} Custom format") choice = input(f"\n{Colors.BOLD}Select naming (1-2): {Colors.ENDC}") if choice == "2": print(f"\n{Colors.DIM}Use {{timestamp}} for date/time, {{dataset}} for dataset name{Colors.ENDC}") print(f"{Colors.DIM}Example: backup-{{dataset}}-{{timestamp}}{Colors.ENDC}") custom = input(f"{Colors.BOLD}Enter format: {Colors.ENDC}") if custom.strip(): return custom.strip() return default_format def manage_automations(self): """Manage existing automations (pause/resume/delete)""" automations = self.get_existing_automations() if not automations: print(f"\n{Colors.YELLOW}No automations found to manage{Colors.ENDC}") return print(f"\n{Colors.BOLD}🔧 Manage Automations{Colors.ENDC}") datasets = list(automations.keys()) # Show automations with status for i, dataset in enumerate(datasets, 1): status, _ = self.check_automation_health(dataset) if status == "healthy": status_icon = f"{Colors.GREEN}●{Colors.ENDC}" elif status == "paused": status_icon = f"{Colors.YELLOW}⏸{Colors.ENDC}" else: status_icon = f"{Colors.RED}✗{Colors.ENDC}" print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {status_icon} {dataset}") # Select automation to manage while True: try: choice = input(f"\n{Colors.BOLD}Select automation (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): selected_dataset = datasets[idx] break print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return # Show management options status, _ = self.check_automation_health(selected_dataset) print(f"\n{Colors.BOLD}Manage '{selected_dataset}':{Colors.ENDC}") options = [] if status == "healthy": options.append(("1", "Pause automation", "pause")) elif status == "paused": options.append(("1", "Resume automation", "resume")) options.append(("2", f"{Colors.RED}Delete automation{Colors.ENDC}", "delete")) options.append(("3", "Cancel", "cancel")) for num, desc, _ in options: print(f"{Colors.CYAN}{num}.{Colors.ENDC} {desc}") # Handle action while True: action_choice = input(f"\n{Colors.BOLD}Select action: {Colors.ENDC}") action_map = {opt[0]: opt[2] for opt in options} if action_choice in action_map: action = action_map[action_choice] if action == "cancel": return elif action == "pause": self.show_loading("Pausing automation") success, message = self.pause_cron_job(selected_dataset) elif action == "resume": self.show_loading("Resuming automation") success, message = self.resume_cron_job(selected_dataset) elif action == "delete": confirm = input(f"{Colors.RED}Are you sure? This cannot be undone (y/N): {Colors.ENDC}") if confirm.lower() != 'y': return self.show_loading("Deleting automation") success, message = self.delete_cron_job(selected_dataset) if success: print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") else: print(f"{Colors.RED}✗ {message}{Colors.ENDC}") return print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def show_status(self): """Display status of all automations""" automations = self.get_existing_automations() if not automations: print(f"\n{Colors.YELLOW}No automations found{Colors.ENDC}") return print(f"\n{Colors.BOLD}📊 Automation Status{Colors.ENDC}") for dataset, info in automations.items(): status, message = self.check_automation_health(dataset) if status == "healthy": icon = f"{Colors.GREEN}✓{Colors.ENDC}" status_text = f"{Colors.GREEN}Active{Colors.ENDC}" elif status == "paused": icon = f"{Colors.YELLOW}⏸{Colors.ENDC}" status_text = f"{Colors.YELLOW}Paused{Colors.ENDC}" else: icon = f"{Colors.RED}✗{Colors.ENDC}" status_text = f"{Colors.RED}Error{Colors.ENDC}" print(f"\n{icon} {Colors.BOLD}{dataset}{Colors.ENDC}") print(f" Status: {status_text}") print(f" Schedule: {Colors.DIM}{info['schedule']}{Colors.ENDC}") if status == "error": print(f" Error: {Colors.RED}{message}{Colors.ENDC}") def create_automation(self): """Create new snapshot automation""" print(f"\n{Colors.BOLD}📸 Create New Automation{Colors.ENDC}") # Select dataset dataset = self.select_dataset() if not dataset: return # Select schedule schedule = self.select_schedule() if not schedule: return # Select naming format name_format = self.select_naming_format(dataset) # Create snapshot command command = self.create_snapshot_command(dataset, name_format) print(f"\n{Colors.YELLOW}Creating automation for {dataset}...{Colors.ENDC}") self.show_loading("Setting up cron job") # Add cron job success, message = self.add_cron_job(dataset, schedule, command) if success: print(f"{Colors.GREEN}✓ Automation created successfully!{Colors.ENDC}") print(f"{Colors.DIM}Dataset: {dataset}{Colors.ENDC}") print(f"{Colors.DIM}Schedule: {schedule}{Colors.ENDC}") else: print(f"{Colors.RED}✗ Failed to create automation{Colors.ENDC}") print(f"{Colors.RED}{message}{Colors.ENDC}") def main_menu(self): """Display main menu and handle user input""" automations = self.get_existing_automations() automation_count = len(automations) if automation_count > 0: healthy_count = sum(1 for dataset in automations if self.check_automation_health(dataset)[0] == "healthy") paused_count = sum(1 for dataset in automations if self.check_automation_health(dataset)[0] == "paused") status_parts = [] if healthy_count > 0: status_parts.append(f"{Colors.GREEN}{healthy_count} Active{Colors.ENDC}") if paused_count > 0: status_parts.append(f"{Colors.YELLOW}{paused_count} Paused{Colors.ENDC}") status_text = " | ".join(status_parts) if status_parts else f"{Colors.RED}0 Running{Colors.ENDC}" print(f"{Colors.BOLD}📈 Automations: {status_text}{Colors.ENDC}") print(f"\n{Colors.BOLD}Options:{Colors.ENDC}") print(f"{Colors.CYAN}1.{Colors.ENDC} View Status") print(f"{Colors.CYAN}2.{Colors.ENDC} Create New Automation") print(f"{Colors.CYAN}3.{Colors.ENDC} Manage Automations") print(f"{Colors.CYAN}4.{Colors.ENDC} Exit") while True: choice = input(f"\n{Colors.BOLD}Select option (1-4): {Colors.ENDC}") if choice == "1": self.show_status() break elif choice == "2": self.create_automation() break elif choice == "3": self.manage_automations() break elif choice == "4": print(f"\n{Colors.DIM}Goodbye!{Colors.ENDC}") sys.exit(0) else: print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def run(self): """Main application loop""" try: while True: os.system('clear' if os.name == 'posix' else 'cls') self.display_banner() self.main_menu() input(f"\n{Colors.DIM}Press Enter to continue...{Colors.ENDC}") except KeyboardInterrupt: print(f"\n\n{Colors.DIM}Goodbye!{Colors.ENDC}") sys.exit(0) if __name__ == "__main__": # Check if running as root (recommended for ZFS operations) if os.geteuid() != 0: print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") print(f"{Colors.DIM}Some ZFS operations may require sudo{Colors.ENDC}") app = PDGSnapshot() app.run()