From 6afbddc9ce15fde30d3682115a79b044393b9230 Mon Sep 17 00:00:00 2001 From: sBubshait Date: Tue, 5 Aug 2025 14:13:13 +0300 Subject: [PATCH] feat: basic snapshotting automation --- snapshots/snapshot.py | 330 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 330 insertions(+) create mode 100644 snapshots/snapshot.py diff --git a/snapshots/snapshot.py b/snapshots/snapshot.py new file mode 100644 index 0000000..97bc837 --- /dev/null +++ b/snapshots/snapshot.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 +""" +PDG - ZFS Snapshot Automation Tool +A modern CLI interface for managing automated ZFS snapshots via cron jobs. +""" + +import os +import sys +import subprocess +import time +import re +from datetime import datetime +from typing import List, Dict, Optional, Tuple + +class Colors: + """ANSI color codes for terminal styling""" + HEADER = '\033[95m' + BLUE = '\033[94m' + CYAN = '\033[96m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + DIM = '\033[2m' + +class PDGSnapshot: + def __init__(self): + self.cron_prefix = "PDGsnapshot-" + self.schedule_options = { + "1": ("hourly", "0 * * * *"), + "2": ("daily", "0 2 * * *"), + "3": ("weekly", "0 2 * * 0"), + "4": ("custom", None) + } + + def display_banner(self): + """Display the PDG ASCII banner""" + banner = f""" +{Colors.CYAN}{Colors.BOLD} + ██████╗ ██████╗ ██████╗ + ██╔══██╗██╔══██╗██╔════╝ + ██████╔╝██║ ██║██║ ███╗ + ██╔═══╝ ██║ ██║██║ ██║ + ██║ ██████╔╝╚██████╔╝ + ╚═╝ ╚═════╝ ╚═════╝ +{Colors.ENDC} +{Colors.DIM} │ Snapshot Automation │{Colors.ENDC} +""" + print(banner) + + def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]: + """Execute shell command and return success status and output""" + try: + result = subprocess.run( + cmd, shell=True, capture_output=capture_output, + text=True, check=False + ) + return result.returncode == 0, result.stdout + result.stderr + except Exception as e: + return False, str(e) + + def get_zfs_datasets(self) -> List[str]: + """Retrieve available ZFS datasets""" + success, output = self.run_command("zfs list -H -o name") + if not success: + return [] + return [line.strip() for line in output.split('\n') if line.strip()] + + def get_existing_automations(self) -> Dict[str, Dict]: + """Get all PDG snapshot automations from crontab""" + success, output = self.run_command("crontab -l 2>/dev/null") + automations = {} + + if success: + for line in output.split('\n'): + if self.cron_prefix in line and '#' in line: + parts = line.split('#', 1) + if len(parts) == 2: + cron_schedule = parts[0].strip() + comment = parts[1].strip() + if comment.startswith(self.cron_prefix): + dataset = comment.replace(self.cron_prefix, '') + automations[dataset] = { + 'schedule': cron_schedule, + 'status': 'healthy' + } + + return automations + + def check_automation_health(self, dataset: str) -> Tuple[str, str]: + """Check if automation is running properly""" + # Check if dataset exists + success, _ = self.run_command(f"zfs list {dataset}") + if not success: + return "error", f"Dataset '{dataset}' not found" + + # Check crontab entry + automations = self.get_existing_automations() + if dataset not in automations: + return "error", "Cron job not found" + + return "healthy", "Running normally" + + def show_loading(self, message: str, duration: float = 2.0): + """Display a loading animation""" + chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + end_time = time.time() + duration + + while time.time() < end_time: + for char in chars: + print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True) + time.sleep(0.1) + if time.time() >= end_time: + break + print(f"\r{' ' * (len(message) + 2)}\r", end="") + + def create_snapshot_command(self, dataset: str, name_format: str) -> str: + """Generate the snapshot command for cron""" + # Replace placeholders in name format + timestamp_cmd = "$(date +'%Y-%m-%d_%H-%M-%S')" + snapshot_name = name_format.replace("{timestamp}", timestamp_cmd) + snapshot_name = snapshot_name.replace("{dataset}", dataset.replace('/', '-')) + + return f"zfs snapshot {dataset}@{snapshot_name}" + + def add_cron_job(self, dataset: str, schedule: str, command: str) -> Tuple[bool, str]: + """Add or update cron job for dataset""" + comment = f"#{self.cron_prefix}{dataset}" + cron_line = f"{schedule} {command} {comment}" + + # Get current crontab + success, current_cron = self.run_command("crontab -l 2>/dev/null") + if not success: + current_cron = "" + + # Remove existing PDG job for this dataset + lines = [] + for line in current_cron.split('\n'): + if line.strip() and not (self.cron_prefix in line and dataset in line): + lines.append(line) + + # Add new job + lines.append(cron_line) + new_cron = '\n'.join(lines) + '\n' + + # Write back to crontab + try: + process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) + process.communicate(input=new_cron) + return process.returncode == 0, "Cron job updated successfully" + except Exception as e: + return False, f"Failed to update crontab: {str(e)}" + + def select_dataset(self) -> Optional[str]: + """Interactive dataset selection""" + datasets = self.get_zfs_datasets() + if not datasets: + print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}") + return None + + print(f"\n{Colors.BOLD}Available ZFS Datasets:{Colors.ENDC}") + for i, dataset in enumerate(datasets, 1): + print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}") + + while True: + try: + choice = input(f"\n{Colors.BOLD}Select dataset (1-{len(datasets)}): {Colors.ENDC}") + idx = int(choice) - 1 + if 0 <= idx < len(datasets): + return datasets[idx] + print(f"{Colors.RED}Invalid selection{Colors.ENDC}") + except (ValueError, KeyboardInterrupt): + return None + + def select_schedule(self) -> Optional[str]: + """Interactive schedule selection""" + print(f"\n{Colors.BOLD}Snapshot Frequency:{Colors.ENDC}") + for key, (name, _) in self.schedule_options.items(): + if name == "custom": + print(f"{Colors.CYAN}{key}.{Colors.ENDC} Custom schedule") + else: + print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name.title()}") + + while True: + choice = input(f"\n{Colors.BOLD}Select frequency (1-4): {Colors.ENDC}") + if choice in self.schedule_options: + name, schedule = self.schedule_options[choice] + if name == "custom": + print(f"\n{Colors.DIM}Format: minute hour day month weekday{Colors.ENDC}") + print(f"{Colors.DIM}Example: '0 */6 * * *' for every 6 hours{Colors.ENDC}") + custom = input(f"{Colors.BOLD}Enter cron schedule: {Colors.ENDC}") + if custom.strip(): + return custom.strip() + else: + return schedule + print(f"{Colors.RED}Invalid selection{Colors.ENDC}") + + def select_naming_format(self, dataset: str) -> str: + """Interactive naming format selection""" + default_format = f"{dataset.replace('/', '-')}-{{timestamp}}" + + print(f"\n{Colors.BOLD}Snapshot Naming:{Colors.ENDC}") + print(f"{Colors.CYAN}1.{Colors.ENDC} Default: {default_format}") + print(f"{Colors.CYAN}2.{Colors.ENDC} Custom format") + + choice = input(f"\n{Colors.BOLD}Select naming (1-2): {Colors.ENDC}") + + if choice == "2": + print(f"\n{Colors.DIM}Use {{timestamp}} for date/time, {{dataset}} for dataset name{Colors.ENDC}") + print(f"{Colors.DIM}Example: backup-{{dataset}}-{{timestamp}}{Colors.ENDC}") + custom = input(f"{Colors.BOLD}Enter format: {Colors.ENDC}") + if custom.strip(): + return custom.strip() + + return default_format + + def create_automation(self): + """Create new snapshot automation""" + print(f"\n{Colors.BOLD}📸 Create New Automation{Colors.ENDC}") + + # Select dataset + dataset = self.select_dataset() + if not dataset: + return + + # Select schedule + schedule = self.select_schedule() + if not schedule: + return + + # Select naming format + name_format = self.select_naming_format(dataset) + + # Create snapshot command + command = self.create_snapshot_command(dataset, name_format) + + print(f"\n{Colors.YELLOW}Creating automation for {dataset}...{Colors.ENDC}") + self.show_loading("Setting up cron job") + + # Add cron job + success, message = self.add_cron_job(dataset, schedule, command) + + if success: + print(f"{Colors.GREEN}✓ Automation created successfully!{Colors.ENDC}") + print(f"{Colors.DIM}Dataset: {dataset}{Colors.ENDC}") + print(f"{Colors.DIM}Schedule: {schedule}{Colors.ENDC}") + else: + print(f"{Colors.RED}✗ Failed to create automation{Colors.ENDC}") + print(f"{Colors.RED}{message}{Colors.ENDC}") + + def show_status(self): + """Display status of all automations""" + automations = self.get_existing_automations() + + if not automations: + print(f"\n{Colors.YELLOW}No automations found{Colors.ENDC}") + return + + print(f"\n{Colors.BOLD}📊 Automation Status{Colors.ENDC}") + + for dataset, info in automations.items(): + status, message = self.check_automation_health(dataset) + + if status == "healthy": + icon = f"{Colors.GREEN}✓{Colors.ENDC}" + status_text = f"{Colors.GREEN}Healthy{Colors.ENDC}" + else: + icon = f"{Colors.RED}✗{Colors.ENDC}" + status_text = f"{Colors.RED}Error{Colors.ENDC}" + + print(f"\n{icon} {Colors.BOLD}{dataset}{Colors.ENDC}") + print(f" Status: {status_text}") + print(f" Schedule: {Colors.DIM}{info['schedule']}{Colors.ENDC}") + + if status == "error": + print(f" Error: {Colors.RED}{message}{Colors.ENDC}") + + def main_menu(self): + """Display main menu and handle user input""" + automations = self.get_existing_automations() + automation_count = len(automations) + + if automation_count > 0: + healthy_count = sum(1 for dataset in automations + if self.check_automation_health(dataset)[0] == "healthy") + print(f"{Colors.GREEN}📈 {healthy_count}/{automation_count} Automations Healthy{Colors.ENDC}") + + print(f"\n{Colors.BOLD}Options:{Colors.ENDC}") + print(f"{Colors.CYAN}1.{Colors.ENDC} View Status") + print(f"{Colors.CYAN}2.{Colors.ENDC} Create New Automation") + print(f"{Colors.CYAN}3.{Colors.ENDC} Exit") + + while True: + choice = input(f"\n{Colors.BOLD}Select option (1-3): {Colors.ENDC}") + + if choice == "1": + self.show_status() + break + elif choice == "2": + self.create_automation() + break + elif choice == "3": + print(f"\n{Colors.DIM}Goodbye!{Colors.ENDC}") + sys.exit(0) + else: + print(f"{Colors.RED}Invalid selection{Colors.ENDC}") + + def run(self): + """Main application loop""" + try: + while True: + os.system('clear' if os.name == 'posix' else 'cls') + self.display_banner() + self.main_menu() + + input(f"\n{Colors.DIM}Press Enter to continue...{Colors.ENDC}") + + except KeyboardInterrupt: + print(f"\n\n{Colors.DIM}Goodbye!{Colors.ENDC}") + sys.exit(0) + +if __name__ == "__main__": + # Check if running as root (recommended for ZFS operations) + if os.geteuid() != 0: + print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}") + print(f"{Colors.DIM}Some ZFS operations may require sudo{Colors.ENDC}") + + app = PDGSnapshot() + app.run() \ No newline at end of file