#!/usr/bin/env python3 """ PDG - ZFS Snapshot Automation Tool A modern CLI interface for managing automated ZFS snapshots via cron jobs. Author: Saleh Bubshait Email: Saleh@Bubshait.me Version: 1.0.0-beta """ import os import sys import subprocess import time import re from datetime import datetime from typing import List, Dict, Optional, Tuple class Colors: """ANSI color codes for terminal styling""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' class PDGSnapshot: def __init__(self): self.cron_prefix = "PDGsnapshot-" self.schedule_options = { "1": ("hourly", "0 * * * *"), "2": ("daily", "0 2 * * *"), "3": ("weekly", "0 2 * * 0"), "4": ("custom", None) } def display_banner(self): """Display the PDG ASCII banner""" banner = f""" {Colors.CYAN}{Colors.BOLD} ██████╗ ██████╗ ██████╗ ██╔══██╗██╔══██╗██╔════╝ ██████╔╝██║ ██║██║ ███╗ ██╔═══╝ ██║ ██║██║ ██║ ██║ ██████╔╝╚██████╔╝ ╚═╝ ╚═════╝ ╚═════╝ {Colors.ENDC} {Colors.DIM} │ Snapshot Automation │{Colors.ENDC} """ print(banner) def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: """Execute shell command and return success status and output""" try: result = subprocess.run( cmd, shell=True, capture_output=capture_output, text=True, check=False ) return result.returncode == 0, result.stdout + result.stderr except Exception as e: return False, str(e) def get_zfs_datasets(self) -> List[str]: """Retrieve available ZFS datasets""" success, output = self.run_command("zfs list -H -o name") if not success: return [] return [line.strip() for line in output.split('\n') if line.strip()] def get_existing_automations(self) -> Dict[str, Dict]: """Get all PDG snapshot automations from crontab""" success, output = self.run_command("crontab -l 2>/dev/null") automations = {} if success: for line in output.split('\n'): if self.cron_prefix in line and '#' in line: # Check if line is paused (commented out) is_paused = line.strip().startswith('#') and not line.strip().startswith('##') if is_paused: # Remove leading # to parse line = line.lstrip('#') parts = line.split('#', 1) if len(parts) == 2: cron_schedule = parts[0].strip() comment = parts[1].strip() if comment.startswith(self.cron_prefix): dataset = comment.replace(self.cron_prefix, '') automations[dataset] = { 'schedule': cron_schedule, 'status': 'paused' if is_paused else 'active' } return automations def check_automation_health(self, dataset: str) -> Tuple[str, str]: """Check if automation is running properly""" # Check if dataset exists success, _ = self.run_command(f"zfs list {dataset}") if not success: return "error", f"Dataset '{dataset}' not found" # Check crontab entry automations = self.get_existing_automations() if dataset not in automations: return "error", "Cron job not found" if automations[dataset]['status'] == 'paused': return "paused", "Automation is paused" return "healthy", "Running normally" def show_loading(self, message: str, duration: float = 2.0): """Display a loading animation""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration while time.time() < end_time: for char in chars: print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) time.sleep(0.1) if time.time() >= end_time: break print(f"\r{' ' * (len(message) + 2)}\r", end="") def create_snapshot_command(self, dataset: str, name_format: str) -> str: """Generate the snapshot command for cron""" # Replace placeholders in name format timestamp_cmd = "$(date +'%Y-%m-%d_%H-%M-%S')" snapshot_name = name_format.replace("{timestamp}", timestamp_cmd) snapshot_name = snapshot_name.replace("{dataset}", dataset.replace('/', '-')) return f"zfs snapshot {dataset}@{snapshot_name}" def add_cron_job(self, dataset: str, schedule: str, command: str) -> Tuple[bool, str]: """Add or update cron job for dataset""" comment = f"#{self.cron_prefix}{dataset}" cron_line = f"{schedule} {command} {comment}" # Get current crontab success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: current_cron = "" # Remove existing PDG job for this dataset lines = [] for line in current_cron.split('\n'): if line.strip() and not (self.cron_prefix in line and dataset in line): lines.append(line) # Add new job lines.append(cron_line) new_cron = '\n'.join(lines) + '\n' # Write back to crontab try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Cron job updated successfully" except Exception as e: return False, f"Failed to update crontab: {str(e)}" def pause_cron_job(self, dataset: str) -> Tuple[bool, str]: """Pause cron job by commenting it out""" success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: return False, "No crontab found" lines = [] job_found = False for line in current_cron.split('\n'): if line.strip() and self.cron_prefix in line and dataset in line: if not line.startswith('#'): lines.append(f"#{line}") # Comment out the line job_found = True else: lines.append(line) # Already paused job_found = True else: lines.append(line) if not job_found: return False, "Automation not found" new_cron = '\n'.join(lines) + '\n' try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Automation paused successfully" except Exception as e: return False, f"Failed to pause automation: {str(e)}" def resume_cron_job(self, dataset: str) -> Tuple[bool, str]: """Resume paused cron job by uncommenting it""" success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: return False, "No crontab found" lines = [] job_found = False for line in current_cron.split('\n'): if line.strip() and self.cron_prefix in line and dataset in line: if line.startswith('#') and not line.startswith('##'): lines.append(line[1:]) # Remove comment job_found = True else: lines.append(line) # Already active job_found = True else: lines.append(line) if not job_found: return False, "Automation not found" new_cron = '\n'.join(lines) + '\n' try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Automation resumed successfully" except Exception as e: return False, f"Failed to resume automation: {str(e)}" def delete_cron_job(self, dataset: str) -> Tuple[bool, str]: """Delete cron job for dataset""" success, current_cron = self.run_command("crontab -l 2>/dev/null") if not success: return False, "No crontab found" lines = [] job_found = False for line in current_cron.split('\n'): if line.strip() and self.cron_prefix in line and dataset in line: job_found = True # Skip this line (delete it) continue else: lines.append(line) if not job_found: return False, "Automation not found" new_cron = '\n'.join(lines) + '\n' try: process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) return process.returncode == 0, "Automation deleted successfully" except Exception as e: return False, f"Failed to delete automation: {str(e)}" def select_dataset(self) -> Optional[str]: """Interactive dataset selection""" datasets = self.get_zfs_datasets() if not datasets: print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") return None # If too many datasets, ask for manual input if len(datasets) > 100: print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}") print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}") while True: dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip() if not dataset: return None # Check if dataset exists success, _ = self.run_command(f"zfs list {dataset}") if success: return dataset else: print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}") retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower() if retry != 'y': return None else: # Show numbered list for reasonable amount of datasets print(f"\n{Colors.BOLD}Available ZFS Datasets:{Colors.ENDC}") for i, dataset in enumerate(datasets, 1): print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") while True: try: choice = input(f"\n{Colors.BOLD}Select dataset (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): return datasets[idx] print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return None def select_schedule(self) -> Optional[str]: """Interactive schedule selection""" print(f"\n{Colors.BOLD}Snapshot Frequency:{Colors.ENDC}") for key, (name, _) in self.schedule_options.items(): if name == "custom": print(f"{Colors.CYAN}{key}.{Colors.ENDC} Custom schedule") else: print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name.title()}") while True: choice = input(f"\n{Colors.BOLD}Select frequency (1-4): {Colors.ENDC}") if choice in self.schedule_options: name, schedule = self.schedule_options[choice] if name == "custom": print(f"\n{Colors.DIM}Format: minute hour day month weekday{Colors.ENDC}") print(f"{Colors.DIM}Example: '0 */6 * * *' for every 6 hours{Colors.ENDC}") custom = input(f"{Colors.BOLD}Enter cron schedule: {Colors.ENDC}") if custom.strip(): return custom.strip() else: return schedule print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def select_naming_format(self, dataset: str) -> str: """Interactive naming format selection""" default_format = f"{dataset.replace('/', '-')}-{{timestamp}}" print(f"\n{Colors.BOLD}Snapshot Naming:{Colors.ENDC}") print(f"{Colors.CYAN}1.{Colors.ENDC} Default: {default_format}") print(f"{Colors.CYAN}2.{Colors.ENDC} Custom format") choice = input(f"\n{Colors.BOLD}Select naming (1-2): {Colors.ENDC}") if choice == "2": print(f"\n{Colors.DIM}Use {{timestamp}} for date/time, {{dataset}} for dataset name{Colors.ENDC}") print(f"{Colors.DIM}Example: backup-{{dataset}}-{{timestamp}}{Colors.ENDC}") custom = input(f"{Colors.BOLD}Enter format: {Colors.ENDC}") if custom.strip(): return custom.strip() return default_format def manage_automations(self): """Manage existing automations (pause/resume/delete)""" automations = self.get_existing_automations() if not automations: print(f"\n{Colors.YELLOW}No automations found to manage{Colors.ENDC}") return print(f"\n{Colors.BOLD}🔧 Manage Automations{Colors.ENDC}") datasets = list(automations.keys()) # Show automations with status for i, dataset in enumerate(datasets, 1): status, _ = self.check_automation_health(dataset) if status == "healthy": status_icon = f"{Colors.GREEN}●{Colors.ENDC}" elif status == "paused": status_icon = f"{Colors.YELLOW}⏸{Colors.ENDC}" else: status_icon = f"{Colors.RED}✗{Colors.ENDC}" print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {status_icon} {dataset}") # Select automation to manage while True: try: choice = input(f"\n{Colors.BOLD}Select automation (1-{len(datasets)}): {Colors.ENDC}") idx = int(choice) - 1 if 0 <= idx < len(datasets): selected_dataset = datasets[idx] break print(f"{Colors.RED}Invalid selection{Colors.ENDC}") except (ValueError, KeyboardInterrupt): return # Show management options status, _ = self.check_automation_health(selected_dataset) print(f"\n{Colors.BOLD}Manage '{selected_dataset}':{Colors.ENDC}") options = [] if status == "healthy": options.append(("1", "Pause automation", "pause")) elif status == "paused": options.append(("1", "Resume automation", "resume")) options.append(("2", f"{Colors.RED}Delete automation{Colors.ENDC}", "delete")) options.append(("3", "Cancel", "cancel")) for num, desc, _ in options: print(f"{Colors.CYAN}{num}.{Colors.ENDC} {desc}") # Handle action while True: action_choice = input(f"\n{Colors.BOLD}Select action: {Colors.ENDC}") action_map = {opt[0]: opt[2] for opt in options} if action_choice in action_map: action = action_map[action_choice] if action == "cancel": return elif action == "pause": self.show_loading("Pausing automation") success, message = self.pause_cron_job(selected_dataset) elif action == "resume": self.show_loading("Resuming automation") success, message = self.resume_cron_job(selected_dataset) elif action == "delete": confirm = input(f"{Colors.RED}Are you sure? This cannot be undone (y/N): {Colors.ENDC}") if confirm.lower() != 'y': return self.show_loading("Deleting automation") success, message = self.delete_cron_job(selected_dataset) if success: print(f"{Colors.GREEN}✓ {message}{Colors.ENDC}") else: print(f"{Colors.RED}✗ {message}{Colors.ENDC}") return print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def show_status(self): """Display status of all automations""" automations = self.get_existing_automations() if not automations: print(f"\n{Colors.YELLOW}No automations found{Colors.ENDC}") return print(f"\n{Colors.BOLD}📊 Automation Status{Colors.ENDC}") for dataset, info in automations.items(): status, message = self.check_automation_health(dataset) if status == "healthy": icon = f"{Colors.GREEN}✓{Colors.ENDC}" status_text = f"{Colors.GREEN}Active{Colors.ENDC}" elif status == "paused": icon = f"{Colors.YELLOW}⏸{Colors.ENDC}" status_text = f"{Colors.YELLOW}Paused{Colors.ENDC}" else: icon = f"{Colors.RED}✗{Colors.ENDC}" status_text = f"{Colors.RED}Error{Colors.ENDC}" print(f"\n{icon} {Colors.BOLD}{dataset}{Colors.ENDC}") print(f" Status: {status_text}") print(f" Schedule: {Colors.DIM}{info['schedule']}{Colors.ENDC}") if status == "error": print(f" Error: {Colors.RED}{message}{Colors.ENDC}") def create_automation(self): """Create new snapshot automation""" print(f"\n{Colors.BOLD}📸 Create New Automation{Colors.ENDC}") # Select dataset dataset = self.select_dataset() if not dataset: return # Select schedule schedule = self.select_schedule() if not schedule: return # Select naming format name_format = self.select_naming_format(dataset) # Create snapshot command command = self.create_snapshot_command(dataset, name_format) print(f"\n{Colors.YELLOW}Creating automation for {dataset}...{Colors.ENDC}") self.show_loading("Setting up cron job") # Add cron job success, message = self.add_cron_job(dataset, schedule, command) if success: print(f"{Colors.GREEN}✓ Automation created successfully!{Colors.ENDC}") print(f"{Colors.DIM}Dataset: {dataset}{Colors.ENDC}") print(f"{Colors.DIM}Schedule: {schedule}{Colors.ENDC}") else: print(f"{Colors.RED}✗ Failed to create automation{Colors.ENDC}") print(f"{Colors.RED}{message}{Colors.ENDC}") def main_menu(self): """Display main menu and handle user input""" automations = self.get_existing_automations() automation_count = len(automations) if automation_count > 0: healthy_count = sum(1 for dataset in automations if self.check_automation_health(dataset)[0] == "healthy") paused_count = sum(1 for dataset in automations if self.check_automation_health(dataset)[0] == "paused") status_parts = [] if healthy_count > 0: status_parts.append(f"{Colors.GREEN}{healthy_count} Active{Colors.ENDC}") if paused_count > 0: status_parts.append(f"{Colors.YELLOW}{paused_count} Paused{Colors.ENDC}") status_text = " | ".join(status_parts) if status_parts else f"{Colors.RED}0 Running{Colors.ENDC}" print(f"{Colors.BOLD}📈 Automations: {status_text}{Colors.ENDC}") print(f"\n{Colors.BOLD}Options:{Colors.ENDC}") print(f"{Colors.CYAN}1.{Colors.ENDC} View Status") print(f"{Colors.CYAN}2.{Colors.ENDC} Create New Automation") print(f"{Colors.CYAN}3.{Colors.ENDC} Manage Automations") print(f"{Colors.CYAN}4.{Colors.ENDC} Exit") while True: choice = input(f"\n{Colors.BOLD}Select option (1-4): {Colors.ENDC}") if choice == "1": self.show_status() break elif choice == "2": self.create_automation() break elif choice == "3": self.manage_automations() break elif choice == "4": print(f"\n{Colors.DIM}Goodbye!{Colors.ENDC}") sys.exit(0) else: print(f"{Colors.RED}Invalid selection{Colors.ENDC}") def run(self): """Main application loop""" try: while True: os.system('clear' if os.name == 'posix' else 'cls') self.display_banner() self.main_menu() input(f"\n{Colors.DIM}Press Enter to continue...{Colors.ENDC}") except KeyboardInterrupt: print(f"\n\n{Colors.DIM}Goodbye!{Colors.ENDC}") sys.exit(0) if __name__ == "__main__": # Check if running as root (recommended for ZFS operations) if os.geteuid() != 0: print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") print(f"{Colors.DIM}Some ZFS operations may require sudo{Colors.ENDC}") app = PDGSnapshot() app.run()