PDG/snapshots/snapshot.py

558 lines
22 KiB
Python

#!/usr/bin/env python3
"""
PDG - ZFS Snapshot Automation Tool
A modern CLI interface for managing automated ZFS snapshots via cron jobs.
Author: Saleh Bubshait
Email: Saleh@Bubshait.me
Version: 1.0.0-beta
"""
import os
import sys
import subprocess
import time
import re
from datetime import datetime
from typing import List, Dict, Optional, Tuple
class Colors:
"""ANSI color codes for terminal styling"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
DIM = '\033[2m'
class PDGSnapshot:
def __init__(self):
self.cron_prefix = "PDGsnapshot-"
self.schedule_options = {
"1": ("hourly", "0 * * * *"),
"2": ("daily", "0 2 * * *"),
"3": ("weekly", "0 2 * * 0"),
"4": ("custom", None)
}
def display_banner(self):
"""Display the PDG ASCII banner"""
banner = f"""
{Colors.CYAN}{Colors.BOLD}
██████╗ ██████╗ ██████╗
██╔══██╗██╔══██╗██╔════╝
██████╔╝██║ ██║██║ ███╗
██╔═══╝ ██║ ██║██║ ██║
██║ ██████╔╝╚██████╔╝
╚═╝ ╚═════╝ ╚═════╝
{Colors.ENDC}
{Colors.DIM} │ Snapshot Automation │{Colors.ENDC}
"""
print(banner)
def run_command(self, cmd: str, capture_output: bool = True) -> Tuple[bool, str]:
"""Execute shell command and return success status and output"""
try:
result = subprocess.run(
cmd, shell=True, capture_output=capture_output,
text=True, check=False
)
return result.returncode == 0, result.stdout + result.stderr
except Exception as e:
return False, str(e)
def get_zfs_datasets(self) -> List[str]:
"""Retrieve available ZFS datasets"""
success, output = self.run_command("zfs list -H -o name")
if not success:
return []
return [line.strip() for line in output.split('\n') if line.strip()]
def get_existing_automations(self) -> Dict[str, Dict]:
"""Get all PDG snapshot automations from crontab"""
success, output = self.run_command("crontab -l 2>/dev/null")
automations = {}
if success:
for line in output.split('\n'):
if self.cron_prefix in line and '#' in line:
# Check if line is paused (commented out)
is_paused = line.strip().startswith('#') and not line.strip().startswith('##')
if is_paused:
# Remove leading # to parse
line = line.lstrip('#')
parts = line.split('#', 1)
if len(parts) == 2:
cron_schedule = parts[0].strip()
comment = parts[1].strip()
if comment.startswith(self.cron_prefix):
dataset = comment.replace(self.cron_prefix, '')
automations[dataset] = {
'schedule': cron_schedule,
'status': 'paused' if is_paused else 'active'
}
return automations
def check_automation_health(self, dataset: str) -> Tuple[str, str]:
"""Check if automation is running properly"""
# Check if dataset exists
success, _ = self.run_command(f"zfs list {dataset}")
if not success:
return "error", f"Dataset '{dataset}' not found"
# Check crontab entry
automations = self.get_existing_automations()
if dataset not in automations:
return "error", "Cron job not found"
if automations[dataset]['status'] == 'paused':
return "paused", "Automation is paused"
return "healthy", "Running normally"
def show_loading(self, message: str, duration: float = 2.0):
"""Display a loading animation"""
chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
end_time = time.time() + duration
while time.time() < end_time:
for char in chars:
print(f"\r{Colors.CYAN}{char}{Colors.ENDC} {message}", end="", flush=True)
time.sleep(0.1)
if time.time() >= end_time:
break
print(f"\r{' ' * (len(message) + 2)}\r", end="")
def create_snapshot_command(self, dataset: str, name_format: str) -> str:
"""Generate the snapshot command for cron"""
# Replace placeholders in name format
timestamp_cmd = "$(date +'%Y-%m-%d_%H-%M-%S')"
snapshot_name = name_format.replace("{timestamp}", timestamp_cmd)
snapshot_name = snapshot_name.replace("{dataset}", dataset.replace('/', '-'))
return f"zfs snapshot {dataset}@{snapshot_name}"
def add_cron_job(self, dataset: str, schedule: str, command: str) -> Tuple[bool, str]:
"""Add or update cron job for dataset"""
comment = f"#{self.cron_prefix}{dataset}"
cron_line = f"{schedule} {command} {comment}"
# Get current crontab
success, current_cron = self.run_command("crontab -l 2>/dev/null")
if not success:
current_cron = ""
# Remove existing PDG job for this dataset
lines = []
for line in current_cron.split('\n'):
if line.strip() and not (self.cron_prefix in line and dataset in line):
lines.append(line)
# Add new job
lines.append(cron_line)
new_cron = '\n'.join(lines) + '\n'
# Write back to crontab
try:
process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True)
process.communicate(input=new_cron)
return process.returncode == 0, "Cron job updated successfully"
except Exception as e:
return False, f"Failed to update crontab: {str(e)}"
def pause_cron_job(self, dataset: str) -> Tuple[bool, str]:
"""Pause cron job by commenting it out"""
success, current_cron = self.run_command("crontab -l 2>/dev/null")
if not success:
return False, "No crontab found"
lines = []
job_found = False
for line in current_cron.split('\n'):
if line.strip() and self.cron_prefix in line and dataset in line:
if not line.startswith('#'):
lines.append(f"#{line}") # Comment out the line
job_found = True
else:
lines.append(line) # Already paused
job_found = True
else:
lines.append(line)
if not job_found:
return False, "Automation not found"
new_cron = '\n'.join(lines) + '\n'
try:
process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True)
process.communicate(input=new_cron)
return process.returncode == 0, "Automation paused successfully"
except Exception as e:
return False, f"Failed to pause automation: {str(e)}"
def resume_cron_job(self, dataset: str) -> Tuple[bool, str]:
"""Resume paused cron job by uncommenting it"""
success, current_cron = self.run_command("crontab -l 2>/dev/null")
if not success:
return False, "No crontab found"
lines = []
job_found = False
for line in current_cron.split('\n'):
if line.strip() and self.cron_prefix in line and dataset in line:
if line.startswith('#') and not line.startswith('##'):
lines.append(line[1:]) # Remove comment
job_found = True
else:
lines.append(line) # Already active
job_found = True
else:
lines.append(line)
if not job_found:
return False, "Automation not found"
new_cron = '\n'.join(lines) + '\n'
try:
process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True)
process.communicate(input=new_cron)
return process.returncode == 0, "Automation resumed successfully"
except Exception as e:
return False, f"Failed to resume automation: {str(e)}"
def delete_cron_job(self, dataset: str) -> Tuple[bool, str]:
"""Delete cron job for dataset"""
success, current_cron = self.run_command("crontab -l 2>/dev/null")
if not success:
return False, "No crontab found"
lines = []
job_found = False
for line in current_cron.split('\n'):
if line.strip() and self.cron_prefix in line and dataset in line:
job_found = True
# Skip this line (delete it)
continue
else:
lines.append(line)
if not job_found:
return False, "Automation not found"
new_cron = '\n'.join(lines) + '\n'
try:
process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True)
process.communicate(input=new_cron)
return process.returncode == 0, "Automation deleted successfully"
except Exception as e:
return False, f"Failed to delete automation: {str(e)}"
def select_dataset(self) -> Optional[str]:
"""Interactive dataset selection"""
datasets = self.get_zfs_datasets()
if not datasets:
print(f"{Colors.RED}✗ No ZFS datasets found{Colors.ENDC}")
return None
# If too many datasets, ask for manual input
if len(datasets) > 100:
print(f"\n{Colors.YELLOW}Found {len(datasets)} datasets - too many to list{Colors.ENDC}")
print(f"{Colors.DIM}Tip: Use 'zfs list' to see all available datasets{Colors.ENDC}")
while True:
dataset = input(f"\n{Colors.BOLD}Enter dataset name: {Colors.ENDC}").strip()
if not dataset:
return None
# Check if dataset exists
success, _ = self.run_command(f"zfs list {dataset}")
if success:
return dataset
else:
print(f"{Colors.RED}✗ Dataset '{dataset}' not found{Colors.ENDC}")
retry = input(f"{Colors.DIM}Try again? (y/n): {Colors.ENDC}").lower()
if retry != 'y':
return None
else:
# Show numbered list for reasonable amount of datasets
print(f"\n{Colors.BOLD}Available ZFS Datasets:{Colors.ENDC}")
for i, dataset in enumerate(datasets, 1):
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {dataset}")
while True:
try:
choice = input(f"\n{Colors.BOLD}Select dataset (1-{len(datasets)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(datasets):
return datasets[idx]
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return None
def select_schedule(self) -> Optional[str]:
"""Interactive schedule selection"""
print(f"\n{Colors.BOLD}Snapshot Frequency:{Colors.ENDC}")
for key, (name, _) in self.schedule_options.items():
if name == "custom":
print(f"{Colors.CYAN}{key}.{Colors.ENDC} Custom schedule")
else:
print(f"{Colors.CYAN}{key}.{Colors.ENDC} {name.title()}")
while True:
choice = input(f"\n{Colors.BOLD}Select frequency (1-4): {Colors.ENDC}")
if choice in self.schedule_options:
name, schedule = self.schedule_options[choice]
if name == "custom":
print(f"\n{Colors.DIM}Format: minute hour day month weekday{Colors.ENDC}")
print(f"{Colors.DIM}Example: '0 */6 * * *' for every 6 hours{Colors.ENDC}")
custom = input(f"{Colors.BOLD}Enter cron schedule: {Colors.ENDC}")
if custom.strip():
return custom.strip()
else:
return schedule
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
def select_naming_format(self, dataset: str) -> str:
"""Interactive naming format selection"""
default_format = f"{dataset.replace('/', '-')}-{{timestamp}}"
print(f"\n{Colors.BOLD}Snapshot Naming:{Colors.ENDC}")
print(f"{Colors.CYAN}1.{Colors.ENDC} Default: {default_format}")
print(f"{Colors.CYAN}2.{Colors.ENDC} Custom format")
choice = input(f"\n{Colors.BOLD}Select naming (1-2): {Colors.ENDC}")
if choice == "2":
print(f"\n{Colors.DIM}Use {{timestamp}} for date/time, {{dataset}} for dataset name{Colors.ENDC}")
print(f"{Colors.DIM}Example: backup-{{dataset}}-{{timestamp}}{Colors.ENDC}")
custom = input(f"{Colors.BOLD}Enter format: {Colors.ENDC}")
if custom.strip():
return custom.strip()
return default_format
def manage_automations(self):
"""Manage existing automations (pause/resume/delete)"""
automations = self.get_existing_automations()
if not automations:
print(f"\n{Colors.YELLOW}No automations found to manage{Colors.ENDC}")
return
print(f"\n{Colors.BOLD}🔧 Manage Automations{Colors.ENDC}")
datasets = list(automations.keys())
# Show automations with status
for i, dataset in enumerate(datasets, 1):
status, _ = self.check_automation_health(dataset)
if status == "healthy":
status_icon = f"{Colors.GREEN}{Colors.ENDC}"
elif status == "paused":
status_icon = f"{Colors.YELLOW}{Colors.ENDC}"
else:
status_icon = f"{Colors.RED}{Colors.ENDC}"
print(f"{Colors.CYAN}{i:2d}.{Colors.ENDC} {status_icon} {dataset}")
# Select automation to manage
while True:
try:
choice = input(f"\n{Colors.BOLD}Select automation (1-{len(datasets)}): {Colors.ENDC}")
idx = int(choice) - 1
if 0 <= idx < len(datasets):
selected_dataset = datasets[idx]
break
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
except (ValueError, KeyboardInterrupt):
return
# Show management options
status, _ = self.check_automation_health(selected_dataset)
print(f"\n{Colors.BOLD}Manage '{selected_dataset}':{Colors.ENDC}")
options = []
if status == "healthy":
options.append(("1", "Pause automation", "pause"))
elif status == "paused":
options.append(("1", "Resume automation", "resume"))
options.append(("2", f"{Colors.RED}Delete automation{Colors.ENDC}", "delete"))
options.append(("3", "Cancel", "cancel"))
for num, desc, _ in options:
print(f"{Colors.CYAN}{num}.{Colors.ENDC} {desc}")
# Handle action
while True:
action_choice = input(f"\n{Colors.BOLD}Select action: {Colors.ENDC}")
action_map = {opt[0]: opt[2] for opt in options}
if action_choice in action_map:
action = action_map[action_choice]
if action == "cancel":
return
elif action == "pause":
self.show_loading("Pausing automation")
success, message = self.pause_cron_job(selected_dataset)
elif action == "resume":
self.show_loading("Resuming automation")
success, message = self.resume_cron_job(selected_dataset)
elif action == "delete":
confirm = input(f"{Colors.RED}Are you sure? This cannot be undone (y/N): {Colors.ENDC}")
if confirm.lower() != 'y':
return
self.show_loading("Deleting automation")
success, message = self.delete_cron_job(selected_dataset)
if success:
print(f"{Colors.GREEN}{message}{Colors.ENDC}")
else:
print(f"{Colors.RED}{message}{Colors.ENDC}")
return
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
def show_status(self):
"""Display status of all automations"""
automations = self.get_existing_automations()
if not automations:
print(f"\n{Colors.YELLOW}No automations found{Colors.ENDC}")
return
print(f"\n{Colors.BOLD}📊 Automation Status{Colors.ENDC}")
for dataset, info in automations.items():
status, message = self.check_automation_health(dataset)
if status == "healthy":
icon = f"{Colors.GREEN}{Colors.ENDC}"
status_text = f"{Colors.GREEN}Active{Colors.ENDC}"
elif status == "paused":
icon = f"{Colors.YELLOW}{Colors.ENDC}"
status_text = f"{Colors.YELLOW}Paused{Colors.ENDC}"
else:
icon = f"{Colors.RED}{Colors.ENDC}"
status_text = f"{Colors.RED}Error{Colors.ENDC}"
print(f"\n{icon} {Colors.BOLD}{dataset}{Colors.ENDC}")
print(f" Status: {status_text}")
print(f" Schedule: {Colors.DIM}{info['schedule']}{Colors.ENDC}")
if status == "error":
print(f" Error: {Colors.RED}{message}{Colors.ENDC}")
def create_automation(self):
"""Create new snapshot automation"""
print(f"\n{Colors.BOLD}📸 Create New Automation{Colors.ENDC}")
# Select dataset
dataset = self.select_dataset()
if not dataset:
return
# Select schedule
schedule = self.select_schedule()
if not schedule:
return
# Select naming format
name_format = self.select_naming_format(dataset)
# Create snapshot command
command = self.create_snapshot_command(dataset, name_format)
print(f"\n{Colors.YELLOW}Creating automation for {dataset}...{Colors.ENDC}")
self.show_loading("Setting up cron job")
# Add cron job
success, message = self.add_cron_job(dataset, schedule, command)
if success:
print(f"{Colors.GREEN}✓ Automation created successfully!{Colors.ENDC}")
print(f"{Colors.DIM}Dataset: {dataset}{Colors.ENDC}")
print(f"{Colors.DIM}Schedule: {schedule}{Colors.ENDC}")
else:
print(f"{Colors.RED}✗ Failed to create automation{Colors.ENDC}")
print(f"{Colors.RED}{message}{Colors.ENDC}")
def main_menu(self):
"""Display main menu and handle user input"""
automations = self.get_existing_automations()
automation_count = len(automations)
if automation_count > 0:
healthy_count = sum(1 for dataset in automations
if self.check_automation_health(dataset)[0] == "healthy")
paused_count = sum(1 for dataset in automations
if self.check_automation_health(dataset)[0] == "paused")
status_parts = []
if healthy_count > 0:
status_parts.append(f"{Colors.GREEN}{healthy_count} Active{Colors.ENDC}")
if paused_count > 0:
status_parts.append(f"{Colors.YELLOW}{paused_count} Paused{Colors.ENDC}")
status_text = " | ".join(status_parts) if status_parts else f"{Colors.RED}0 Running{Colors.ENDC}"
print(f"{Colors.BOLD}📈 Automations: {status_text}{Colors.ENDC}")
print(f"\n{Colors.BOLD}Options:{Colors.ENDC}")
print(f"{Colors.CYAN}1.{Colors.ENDC} View Status")
print(f"{Colors.CYAN}2.{Colors.ENDC} Create New Automation")
print(f"{Colors.CYAN}3.{Colors.ENDC} Manage Automations")
print(f"{Colors.CYAN}4.{Colors.ENDC} Exit")
while True:
choice = input(f"\n{Colors.BOLD}Select option (1-4): {Colors.ENDC}")
if choice == "1":
self.show_status()
break
elif choice == "2":
self.create_automation()
break
elif choice == "3":
self.manage_automations()
break
elif choice == "4":
print(f"\n{Colors.DIM}Goodbye!{Colors.ENDC}")
sys.exit(0)
else:
print(f"{Colors.RED}Invalid selection{Colors.ENDC}")
def run(self):
"""Main application loop"""
try:
while True:
os.system('clear' if os.name == 'posix' else 'cls')
self.display_banner()
self.main_menu()
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.ENDC}")
except KeyboardInterrupt:
print(f"\n\n{Colors.DIM}Goodbye!{Colors.ENDC}")
sys.exit(0)
if __name__ == "__main__":
# Check if running as root (recommended for ZFS operations)
if os.geteuid() != 0:
print(f"{Colors.YELLOW}⚠ Warning: Running without root privileges{Colors.ENDC}")
print(f"{Colors.DIM}Some ZFS operations may require sudo{Colors.ENDC}")
app = PDGSnapshot()
app.run()