#!/usr/bin/env bash # pg_scribe - Incremental SQL backup system for PostgreSQL # # This script provides a unified CLI for managing PostgreSQL backups # using logical replication and plain SQL format. set -euo pipefail # Version VERSION="0.1.0" # Exit codes EXIT_SUCCESS=0 EXIT_GENERAL_ERROR=1 EXIT_CONNECTION_ERROR=2 EXIT_SLOT_ERROR=3 EXIT_BACKUP_ERROR=4 EXIT_VALIDATION_ERROR=5 EXIT_WARNING=10 # Default values DEFAULT_SLOT="pg_scribe" DEFAULT_PORT="5432" DEFAULT_HOST="localhost" DEFAULT_STATUS_INTERVAL=10 DEFAULT_FSYNC_INTERVAL=10 # Global variables ACTION="" DBNAME="" HOST="${PGHOST:-$DEFAULT_HOST}" PORT="${PGPORT:-$DEFAULT_PORT}" USERNAME="${PGUSER:-${USER:-}}" BACKUP_DIR="" SLOT="$DEFAULT_SLOT" STATUS_INTERVAL="$DEFAULT_STATUS_INTERVAL" FSYNC_INTERVAL="$DEFAULT_FSYNC_INTERVAL" COMPRESS="" CREATE_DB=0 BASE_BACKUP="" NO_SYNC_SEQUENCES=0 INCLUDE_ACTIVE=0 NO_PASSWORD=0 FORCE_PASSWORD=0 VERBOSE=0 FORCE=0 # Color output support if [[ "${PG_COLOR:-auto}" == "always" ]] || [[ "${PG_COLOR:-auto}" == "auto" && -t 2 ]]; then RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' BOLD='\033[1m' RESET='\033[0m' else RED='' GREEN='' YELLOW='' BLUE='' BOLD='' RESET='' fi # Logging functions (output to stderr) log_info() { echo -e "${BLUE}INFO:${RESET} $*" >&2 } log_success() { echo -e "${GREEN}SUCCESS:${RESET} $*" >&2 } log_warning() { echo -e "${YELLOW}WARNING:${RESET} $*" >&2 } log_error() { echo -e "${RED}ERROR:${RESET} $*" >&2 } log_step() { echo -e "${BOLD}==>${RESET} $*" >&2 } # Usage information usage() { cat </dev/null | cut -f1 } # Test database connection test_connection() { log_step "Testing database connection..." local psql_args mapfile -t psql_args < <(build_psql_args) if ! psql "${psql_args[@]}" -c "SELECT version();" >/dev/null 2>&1; then log_error "Failed to connect to database" log_error "Connection details: host=$HOST port=$PORT dbname=$DBNAME user=$USERNAME" exit "$EXIT_CONNECTION_ERROR" fi if [[ "$VERBOSE" -eq 1 ]]; then log_success "Connected to database" fi } # Execute SQL query and return result query_db() { local sql="$1" local psql_args mapfile -t psql_args < <(build_psql_args) psql "${psql_args[@]}" -t -A -c "$sql" 2>&1 } # Execute SQL query silently (return exit code only) query_db_silent() { local sql="$1" local psql_args mapfile -t psql_args < <(build_psql_args) psql "${psql_args[@]}" -t -A -c "$sql" >/dev/null 2>&1 } # Take a globals backup (roles, tablespaces, etc.) # Arguments: # $1 - chain directory path # Returns: # Echoes the path to the created globals backup file # Exits script on failure take_globals_backup() { local chain_dir="$1" local globals_backup_file="$chain_dir/globals.sql" log_info "Taking globals backup..." # Build pg_dumpall connection arguments local dumpall_args mapfile -t dumpall_args < <(build_pg_dumpall_args) # Add globals-only flag and output file dumpall_args+=(--globals-only) dumpall_args+=(--file="$globals_backup_file") if pg_dumpall "${dumpall_args[@]}"; then local globals_size globals_size=$(get_file_size "$globals_backup_file") log_success "Globals backup completed ($globals_size)" echo "$globals_backup_file" else log_error "Globals backup failed" # Clean up partial file rm -f "$globals_backup_file" 2>/dev/null || true exit "$EXIT_BACKUP_ERROR" fi } # Validate required arguments for a command # Arguments: command_name arg_name:description [arg_name:description ...] # Example: validate_required_args "init" "DBNAME:database" "BACKUP_DIR:backup directory" validate_required_args() { local command_name="$1" shift local validation_failed=0 for arg_spec in "$@"; do local arg_name="${arg_spec%%:*}" local arg_description="${arg_spec#*:}" # Use indirect variable reference to check if argument is set if [[ -z "${!arg_name}" ]]; then log_error "--${command_name} requires ${arg_description}" validation_failed=1 fi done if [[ "$validation_failed" -eq 1 ]]; then exit "$EXIT_VALIDATION_ERROR" fi } # Check replication slot existence # Arguments: # $1 - slot name # $2 - should_exist: 1 if slot should exist, 0 if slot should NOT exist # Exits with appropriate error code if expectation is not met check_replication_slot() { local slot_name="$1" local should_exist="$2" local slot_exists slot_exists=$(query_db "SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slot_name';") if [[ "$should_exist" -eq 0 ]]; then # Slot should NOT exist if [[ "$slot_exists" -gt 0 ]]; then log_error "Replication slot '$slot_name' already exists" log_error "" log_error "A replication slot with this name already exists in the database." log_error "This may indicate:" log_error " - A previous initialization that was not cleaned up" log_error " - Another pg_scribe instance using the same slot name" log_error "" log_error "To resolve:" log_error " - Use a different slot name with -S/--slot option" log_error " - Or drop the existing slot (if safe):" log_error " psql -d $DBNAME -c \"SELECT pg_drop_replication_slot('$slot_name');\"" exit "$EXIT_SLOT_ERROR" fi else # Slot should exist if [[ "$slot_exists" -eq 0 ]]; then log_error "Replication slot '$slot_name' does not exist" log_error "" log_error "You must initialize the backup system first:" log_error " pg_scribe --init -d $DBNAME -f -S $slot_name" log_error "" log_error "Or verify the slot name is correct with:" log_error " psql -d $DBNAME -c \"SELECT slot_name FROM pg_replication_slots;\"" exit "$EXIT_SLOT_ERROR" fi log_success "Replication slot '$slot_name' found" fi } # # --init command implementation # cmd_init() { log_step "Initializing pg_scribe backup system" # Validate required arguments validate_required_args "init" "DBNAME:-d/--dbname" "BACKUP_DIR:-f/--file (backup directory)" # Cleanup tracking for failure handling local CREATED_SLOT="" local CREATED_FILES=() # Cleanup function for handling failures # shellcheck disable=SC2317 # Function called via trap handler cleanup_on_failure() { local exit_code=$? # Only cleanup on actual failure, not on successful exit if [[ $exit_code -ne 0 && $exit_code -ne $EXIT_WARNING ]]; then log_info "Cleaning up after failed initialization..." # Drop replication slot if we created it if [[ -n "$CREATED_SLOT" ]]; then log_info "Dropping replication slot '$CREATED_SLOT'..." query_db "SELECT pg_drop_replication_slot('$CREATED_SLOT');" 2>/dev/null || true fi # Remove files we created for file in "${CREATED_FILES[@]}"; do if [[ -f "$file" ]]; then log_info "Removing partial file: $file" rm -f "$file" 2>/dev/null || true fi done log_info "Cleanup complete" fi } # Set up cleanup trap trap cleanup_on_failure EXIT INT TERM # Test connection first test_connection # Phase 1: Validation log_step "Phase 1: Validation" local validation_failed=0 local has_warnings=0 # Check wal_level log_info "Checking wal_level configuration..." local wal_level wal_level=$(query_db "SHOW wal_level;") if [[ "$wal_level" != "logical" ]]; then log_error "CRITICAL: wal_level is '$wal_level', must be 'logical'" log_error " Fix: Add 'wal_level = logical' to postgresql.conf and restart PostgreSQL" validation_failed=1 else if [[ "$VERBOSE" -eq 1 ]]; then log_success "wal_level = logical" fi fi # Check max_replication_slots log_info "Checking max_replication_slots configuration..." local max_slots max_slots=$(query_db "SHOW max_replication_slots;") if [[ "$max_slots" -lt 1 ]]; then log_error "CRITICAL: max_replication_slots is $max_slots, must be >= 1" log_error " Fix: Add 'max_replication_slots = 10' to postgresql.conf and restart PostgreSQL" validation_failed=1 else if [[ "$VERBOSE" -eq 1 ]]; then log_success "max_replication_slots = $max_slots" fi fi # Check max_wal_senders log_info "Checking max_wal_senders configuration..." local max_senders max_senders=$(query_db "SHOW max_wal_senders;") if [[ "$max_senders" -lt 1 ]]; then log_error "CRITICAL: max_wal_senders is $max_senders, must be >= 1" log_error " Fix: Add 'max_wal_senders = 10' to postgresql.conf and restart PostgreSQL" validation_failed=1 else if [[ "$VERBOSE" -eq 1 ]]; then log_success "max_wal_senders = $max_senders" fi fi # Check replica identity on all tables log_info "Checking replica identity for all tables..." local bad_tables bad_tables=$(query_db " SELECT n.nspname || '.' || c.relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 'information_schema') AND c.relreplident IN ('d', 'n') AND NOT EXISTS ( SELECT 1 FROM pg_index i WHERE i.indrelid = c.oid AND i.indisprimary ) ORDER BY n.nspname, c.relname; ") if [[ -n "$bad_tables" ]]; then log_error "CRITICAL: The following tables lack adequate replica identity:" while IFS= read -r table; do log_error " - $table" done <<< "$bad_tables" log_error " Fix: Add a primary key or set replica identity:" log_error " ALTER TABLE ADD PRIMARY KEY (id);" log_error " -- OR --" log_error " ALTER TABLE
REPLICA IDENTITY FULL;" validation_failed=1 else if [[ "$VERBOSE" -eq 1 ]]; then log_success "All tables have adequate replica identity" fi fi # Warning: Check for unlogged tables log_info "Checking for unlogged tables..." local unlogged_tables unlogged_tables=$(query_db " SELECT n.nspname || '.' || c.relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND c.relpersistence = 'u' AND n.nspname NOT IN ('pg_catalog', 'information_schema') ORDER BY n.nspname, c.relname; ") if [[ -n "$unlogged_tables" ]]; then log_warning "The following unlogged tables will NOT be backed up:" while IFS= read -r table; do log_warning " - $table" done <<< "$unlogged_tables" has_warnings=1 fi # Warning: Check for large objects log_info "Checking for large objects..." local large_object_count large_object_count=$(query_db "SELECT count(*) FROM pg_largeobject_metadata;") if [[ "$large_object_count" -gt 0 ]]; then log_warning "Database contains $large_object_count large objects" log_warning "Large objects are NOT incrementally backed up (only in full backups)" log_warning "Consider using BYTEA columns instead for incremental backup support" has_warnings=1 fi # Check if validation failed if [[ "$validation_failed" -eq 1 ]]; then if [[ "$FORCE" -eq 1 ]]; then log_warning "Validation failed but --force specified, continuing anyway..." else log_error "Validation failed. Fix the CRITICAL issues above and try again." log_error "Or use --force to skip validation (NOT recommended)." exit "$EXIT_VALIDATION_ERROR" fi else log_success "All validation checks passed" fi # Phase 2: Setup log_step "Phase 2: Setup" # Create backup directory log_info "Checking backup directory..." if [[ ! -d "$BACKUP_DIR" ]]; then if ! mkdir -p "$BACKUP_DIR"; then log_error "Failed to create backup directory: $BACKUP_DIR" exit "$EXIT_BACKUP_ERROR" fi log_success "Created backup directory: $BACKUP_DIR" else # Directory exists - check if already initialized (has chains) local existing_chains existing_chains=$(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | head -1) if [[ -n "$existing_chains" ]]; then log_error "Backup directory already initialized: $BACKUP_DIR" log_error "Found existing chain(s)" log_error "" log_error "This directory has already been initialized with pg_scribe." log_error "To create a new chain, use: pg_scribe --new-chain" log_error "" log_error "If you want to re-initialize from scratch:" log_error " 1. Stop any running backup processes" log_error " 2. Drop the replication slot (or verify it's safe to reuse)" log_error " 3. Remove or rename the existing backup directory" exit "$EXIT_VALIDATION_ERROR" fi log_info "Using existing directory: $BACKUP_DIR" fi # Generate chain ID and create chain directory local chain_id chain_id=$(get_chain_id) local chain_dir="$BACKUP_DIR/chain-$chain_id" log_info "Creating initial chain: $chain_id" if ! mkdir -p "$chain_dir"; then log_error "Failed to create chain directory: $chain_dir" exit "$EXIT_BACKUP_ERROR" fi # Create wal2sql extension log_info "Creating wal2sql extension..." if query_db_silent "CREATE EXTENSION IF NOT EXISTS wal2sql;"; then log_success "wal2sql extension created (or already exists)" else log_error "Failed to create wal2sql extension" log_error "Ensure wal2sql.so is installed in PostgreSQL's lib directory" log_error "Run: cd wal2sql && make && make install" exit "$EXIT_GENERAL_ERROR" fi # Create replication slot with snapshot export log_info "Creating logical replication slot '$SLOT'..." # Check if slot already exists check_replication_slot "$SLOT" 0 # Create slot using SQL # Note: For POC, we create the slot and take the base backup sequentially # The slot will preserve WAL from its creation LSN forward, ensuring no changes are lost local slot_result if ! slot_result=$(query_db "SELECT slot_name, lsn FROM pg_create_logical_replication_slot('$SLOT', 'wal2sql');"); then log_error "Failed to create replication slot" log_error "$slot_result" exit "$EXIT_SLOT_ERROR" fi CREATED_SLOT="$SLOT" # Track for cleanup log_success "Replication slot '$SLOT' created" # Take base backup immediately after slot creation # The slot preserves WAL from its creation point, so all changes will be captured local base_backup_file="$chain_dir/base.sql" CREATED_FILES+=("$base_backup_file") # Track for cleanup log_info "Taking base backup..." local psql_args mapfile -t psql_args < <(build_psql_args) if pg_dump "${psql_args[@]}" --file="$base_backup_file"; then local base_size base_size=$(get_file_size "$base_backup_file") log_success "Base backup completed ($base_size)" else log_error "Base backup failed" exit "$EXIT_BACKUP_ERROR" fi # Take globals backup local globals_backup_file globals_backup_file=$(take_globals_backup "$chain_dir") CREATED_FILES+=("$globals_backup_file") # Track for cleanup # Generate metadata file log_info "Generating metadata file..." local metadata_file="$chain_dir/metadata.json" CREATED_FILES+=("$metadata_file") # Track for cleanup local pg_version pg_version=$(query_db "SELECT version();") cat > "$metadata_file" <&2 log_step "Initialization Complete" log_success "Initial chain created: $chain_id" log_success "Location: $chain_dir" log_success "Replication slot: $SLOT" log_info "Next steps:" log_info " 1. Start streaming incremental backups:" log_info " pg_scribe --start -d $DBNAME -f $BACKUP_DIR -S $SLOT" log_info " 2. Monitor replication slot health:" log_info " pg_scribe --status -d $DBNAME -S $SLOT -f $BACKUP_DIR" if [[ "$has_warnings" -eq 1 ]]; then exit "$EXIT_WARNING" else exit "$EXIT_SUCCESS" fi } # # --start command implementation # cmd_start() { log_step "Starting incremental backup collection" # Validate required arguments validate_required_args "start" "DBNAME:-d/--dbname" "BACKUP_DIR:-f/--file (backup directory)" # Verify backup directory exists if [[ ! -d "$BACKUP_DIR" ]]; then log_error "Backup directory does not exist: $BACKUP_DIR" log_error "Run --init first to initialize the backup system" exit "$EXIT_BACKUP_ERROR" fi # Find latest chain log_step "Finding latest chain..." local latest_chain latest_chain=$(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | sort | tail -1) if [[ -z "$latest_chain" ]]; then log_error "No chains found in backup directory: $BACKUP_DIR" log_error "Run --init first to create the initial chain" exit "$EXIT_BACKUP_ERROR" fi local chain_id chain_id=$(basename "$latest_chain" | sed 's/^chain-//') log_success "Found latest chain: $chain_id" # Check for existing streaming process local pidfile="$BACKUP_DIR/.pg_scribe.pid" if [[ -f "$pidfile" ]]; then local existing_pid existing_pid=$(cat "$pidfile") # Check if process is still running if kill -0 "$existing_pid" 2>/dev/null; then log_error "Already streaming to $BACKUP_DIR (PID $existing_pid)" log_error "Stop the existing process first or use a different backup directory" exit "$EXIT_GENERAL_ERROR" else log_info "Removing stale pidfile (process $existing_pid not running)" rm -f "$pidfile" fi fi # Test connection test_connection # Verify replication slot exists log_step "Verifying replication slot '$SLOT'..." check_replication_slot "$SLOT" 1 # Determine output file local output_file="$latest_chain/active.sql" # Build pg_recvlogical arguments local pg_recv_args=() mapfile -t pg_recv_args < <(build_pg_recvlogical_args) # Add required arguments pg_recv_args+=(--slot="$SLOT") pg_recv_args+=(--start) pg_recv_args+=(--file="$output_file") # Add plugin options pg_recv_args+=(--option=include_transaction=on) # Add status interval pg_recv_args+=(--status-interval="$STATUS_INTERVAL") # Add fsync interval (0 means disabled) if [[ "$FSYNC_INTERVAL" -gt 0 ]]; then pg_recv_args+=(--fsync-interval="$FSYNC_INTERVAL") else # For fsync-interval=0, we skip the parameter to avoid pg_recvlogical errors log_info "Fsync disabled (fsync-interval=0)" fi # Display configuration log_step "Configuration" log_info "Database: $DBNAME" log_info "Replication slot: $SLOT" log_info "Chain: $chain_id" log_info "Output file: $output_file" log_info "Status interval: ${STATUS_INTERVAL}s" if [[ "$FSYNC_INTERVAL" -gt 0 ]]; then log_info "Fsync interval: ${FSYNC_INTERVAL}s" else log_info "Fsync: disabled" fi echo >&2 # Write pidfile before exec (PID stays same after exec) echo $$ > "$pidfile" # Start streaming - replace this process with pg_recvlogical log_step "Starting streaming replication..." log_info "Press Ctrl+C to stop" log_info "Send SIGHUP to rotate output file" echo >&2 # Replace this process with pg_recvlogical # This eliminates signal forwarding issues and prevents orphaned processes # The PID stays the same, making cleanup in tests more reliable exec pg_recvlogical "${pg_recv_args[@]}" } # # --rotate-diff command implementation # cmd_rotate_diff() { log_step "Rotating differential file" # Validate required arguments validate_required_args "rotate-diff" "BACKUP_DIR:-f/--file (backup directory)" # Verify backup directory exists if [[ ! -d "$BACKUP_DIR" ]]; then log_error "Backup directory does not exist: $BACKUP_DIR" exit "$EXIT_BACKUP_ERROR" fi # Find and validate pidfile local pidfile="$BACKUP_DIR/.pg_scribe.pid" if [[ ! -f "$pidfile" ]]; then log_error "No active streaming process found" log_error "Pidfile not found: $pidfile" exit "$EXIT_GENERAL_ERROR" fi local pid pid=$(cat "$pidfile") # Verify process is running if ! kill -0 "$pid" 2>/dev/null; then log_error "Stale pidfile (process $pid not running)" log_error "Remove $pidfile and start streaming with --start" exit "$EXIT_GENERAL_ERROR" fi # Verify process is pg_recvlogical local proc_name proc_name=$(ps -p "$pid" -o comm= 2>/dev/null || echo "") if [[ "$proc_name" != "pg_recvlogical" ]]; then log_error "PID $pid is not pg_recvlogical (found: $proc_name)" exit "$EXIT_GENERAL_ERROR" fi log_success "Found active streaming process (PID $pid)" # Find active.sql file local active_file active_file=$(find "$BACKUP_DIR"/chain-*/active.sql 2>/dev/null | head -1) if [[ -z "$active_file" ]]; then log_error "No active.sql found in any chain" exit "$EXIT_BACKUP_ERROR" fi local chain_dir chain_dir=$(dirname "$active_file") local chain_id chain_id=$(basename "$chain_dir" | sed 's/^chain-//') log_success "Found active chain: $chain_id" # Generate differential timestamp local diff_timestamp diff_timestamp=$(get_chain_id) local sealed_file="$chain_dir/diff-$diff_timestamp.sql" # Get file size before rotation local active_size active_size=$(get_file_size "$active_file") # Atomic rotation log_info "Rotating active.sql to diff-$diff_timestamp.sql" # Rename active → diff (pg_recvlogical still has file open) if ! mv "$active_file" "$sealed_file"; then log_error "Failed to rename active.sql" exit "$EXIT_BACKUP_ERROR" fi # Send SIGHUP to trigger file rotation log_info "Sending SIGHUP to pg_recvlogical..." kill -HUP "$pid" # Wait for new active.sql to appear local timeout=30 local waited=0 log_info "Waiting for new active.sql..." while [[ $waited -lt $timeout ]]; do if [[ -f "$chain_dir/active.sql" ]]; then # Wait a moment to ensure it's being written sleep 1 if [[ -s "$chain_dir/active.sql" ]] || [[ -f "$chain_dir/active.sql" ]]; then log_success "Rotated differential: diff-$diff_timestamp.sql ($active_size)" log_success "New differential started" exit "$EXIT_SUCCESS" fi fi sleep 1 waited=$((waited + 1)) done log_error "Timeout waiting for new active.sql" log_error "The rotation may have failed - check pg_recvlogical process" exit "$EXIT_GENERAL_ERROR" } # # --new-chain command implementation # cmd_new_chain() { log_step "Creating new chain" # Validate required arguments validate_required_args "new-chain" "DBNAME:-d/--dbname" "BACKUP_DIR:-f/--file (backup directory)" # Test connection test_connection # Ensure backup directory exists if [[ ! -d "$BACKUP_DIR" ]]; then log_error "Backup directory does not exist: $BACKUP_DIR" log_error "Run --init first to initialize the backup system" exit "$EXIT_BACKUP_ERROR" fi # Generate new chain ID and create directory local new_chain_id new_chain_id=$(get_chain_id) local new_chain_dir="$BACKUP_DIR/chain-$new_chain_id" log_info "Creating new chain: $new_chain_id" if ! mkdir -p "$new_chain_dir"; then log_error "Failed to create chain directory: $new_chain_dir" exit "$EXIT_BACKUP_ERROR" fi # Take new base backup log_step "Taking base backup" local base_backup_file="$new_chain_dir/base.sql" local psql_args mapfile -t psql_args < <(build_psql_args) # Check if compression is requested local pg_dump_args=("${psql_args[@]}") if [[ -n "$COMPRESS" ]]; then log_info "Compression: $COMPRESS" pg_dump_args+=(--compress="$COMPRESS") # Add appropriate file extension for compression local compress_type="${COMPRESS%%:*}" case "$compress_type" in gzip) base_backup_file="${base_backup_file}.gz" ;; lz4) base_backup_file="${base_backup_file}.lz4" ;; zstd) base_backup_file="${base_backup_file}.zst" ;; none) # No compression, no extension ;; *) log_error "Unknown compression method: $compress_type" log_error "Supported methods: gzip, lz4, zstd, none" exit "$EXIT_VALIDATION_ERROR" ;; esac fi pg_dump_args+=(--file="$base_backup_file") if pg_dump "${pg_dump_args[@]}"; then local base_size base_size=$(get_file_size "$base_backup_file") log_success "Base backup completed ($base_size)" else log_error "Base backup failed" # Clean up partial files rm -rf "$new_chain_dir" 2>/dev/null || true exit "$EXIT_BACKUP_ERROR" fi # Take globals backup local globals_backup_file globals_backup_file=$(take_globals_backup "$new_chain_dir") # Generate metadata file log_info "Generating metadata file..." local metadata_file="$new_chain_dir/metadata.json" local pg_version pg_version=$(query_db "SELECT version();") cat > "$metadata_file" </dev/null; then streaming_active=1 fi fi # Final summary echo >&2 log_step "New Chain Complete" log_success "Chain created: $new_chain_id" log_success "Location: $new_chain_dir" log_success "Base backup: $(basename "$base_backup_file")" if [[ $streaming_active -eq 1 ]]; then echo >&2 log_info "Active streaming process detected (PID $old_pid)" log_info "" log_info "To transition to the new chain:" log_info " 1. Stop the current streaming process:" log_info " kill -TERM $old_pid" log_info " 2. Start streaming to the new chain:" log_info " pg_scribe --start -d $DBNAME -f $BACKUP_DIR -S $SLOT" log_info "" log_info "Or use --rotate-diff to seal a differential before transitioning" else echo >&2 log_info "No active streaming process detected" log_info "" log_info "Start streaming to the new chain:" log_info " pg_scribe --start -d $DBNAME -f $BACKUP_DIR -S $SLOT" fi exit "$EXIT_SUCCESS" } # # --restore command implementation # cmd_restore() { log_step "Restoring database from backup" # Validate required arguments validate_required_args "restore" "DBNAME:-d/--dbname (target database)" "BACKUP_DIR:-f/--file (backup directory)" # Verify backup directory exists if [[ ! -d "$BACKUP_DIR" ]]; then log_error "Backup directory does not exist: $BACKUP_DIR" exit "$EXIT_BACKUP_ERROR" fi # Determine target chain log_step "Locating chain" local chain_dir="" local chain_id="" if [[ -n "$BASE_BACKUP" ]]; then # BASE_BACKUP can be a chain ID or a specific chain directory if [[ -d "$BACKUP_DIR/chain-$BASE_BACKUP" ]]; then chain_dir="$BACKUP_DIR/chain-$BASE_BACKUP" chain_id="$BASE_BACKUP" log_info "Using specified chain: $chain_id" elif [[ -d "$BASE_BACKUP" ]] && [[ "$(basename "$BASE_BACKUP")" =~ ^chain- ]]; then chain_dir="$BASE_BACKUP" chain_id=$(basename "$chain_dir" | sed 's/^chain-//') log_info "Using specified chain directory: $chain_id" else log_error "Chain not found: $BASE_BACKUP" exit "$EXIT_BACKUP_ERROR" fi else # Use latest chain chain_dir=$(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | sort | tail -1) if [[ -z "$chain_dir" ]]; then log_error "No chains found in backup directory: $BACKUP_DIR" log_error "Run --init first to create a chain" exit "$EXIT_BACKUP_ERROR" fi chain_id=$(basename "$chain_dir" | sed 's/^chain-//') log_success "Found latest chain: $chain_id" fi # Verify chain structure local base_backup_path="$chain_dir/base.sql" local globals_backup_path="$chain_dir/globals.sql" # Check for compressed base backup if [[ ! -f "$base_backup_path" ]]; then # Try compressed variants if [[ -f "$base_backup_path.gz" ]]; then base_backup_path="$base_backup_path.gz" elif [[ -f "$base_backup_path.lz4" ]]; then base_backup_path="$base_backup_path.lz4" elif [[ -f "$base_backup_path.zst" ]]; then base_backup_path="$base_backup_path.zst" else log_error "Base backup not found in chain: $chain_id" exit "$EXIT_BACKUP_ERROR" fi fi log_info "Base backup: $(basename "$base_backup_path")" # Find all sealed differentials (sorted by timestamp) local differential_files=() mapfile -t differential_files < <(find "$chain_dir" -maxdepth 1 -name 'diff-*.sql' 2>/dev/null | sort) if [[ ${#differential_files[@]} -gt 0 ]]; then log_info "Found ${#differential_files[@]} sealed differential(s)" else log_info "No sealed differentials found (will restore base backup only)" fi # Create target database if requested if [[ "$CREATE_DB" -eq 1 ]]; then log_step "Creating target database" # Connect to postgres database (not target database) to create it local create_dbname="$DBNAME" DBNAME="postgres" # Test connection to postgres database test_connection # Check if database already exists local db_exists db_exists=$(query_db "SELECT count(*) FROM pg_database WHERE datname = '$create_dbname';") if [[ "$db_exists" -gt 0 ]]; then log_error "Database '$create_dbname' already exists" log_error "Drop it first or omit --create flag to restore into existing database" exit "$EXIT_BACKUP_ERROR" fi # Create database if query_db_silent "CREATE DATABASE \"$create_dbname\";"; then log_success "Created database: $create_dbname" else log_error "Failed to create database: $create_dbname" exit "$EXIT_BACKUP_ERROR" fi # Switch back to target database for subsequent operations DBNAME="$create_dbname" fi # Test connection to target database test_connection # Restore globals backup if [[ -f "$globals_backup_path" ]]; then log_step "Restoring globals (roles, tablespaces)" # Build connection args for psql # Note: globals must be restored to postgres database, not target database local save_dbname="$DBNAME" DBNAME="postgres" local psql_args mapfile -t psql_args < <(build_psql_args) DBNAME="$save_dbname" if psql "${psql_args[@]}" -f "$globals_backup_path" >/dev/null 2>&1; then log_success "Globals restored successfully" else log_warning "Globals restore had errors (may be expected if roles already exist)" fi else log_warning "No globals backup found in chain (roles and tablespaces will not be restored)" fi # Restore base backup log_step "Restoring base backup" local start_time start_time=$(date +%s) local psql_args mapfile -t psql_args < <(build_psql_args) # Handle compressed backups if [[ "$base_backup_path" == *.gz ]]; then log_info "Decompressing gzip backup..." if gunzip -c "$base_backup_path" | psql "${psql_args[@]}" >/dev/null 2>&1; then log_success "Base backup restored successfully" else log_error "Base backup restore failed" exit "$EXIT_BACKUP_ERROR" fi elif [[ "$base_backup_path" == *.zst ]]; then log_info "Decompressing zstd backup..." if zstd -dc "$base_backup_path" | psql "${psql_args[@]}" >/dev/null 2>&1; then log_success "Base backup restored successfully" else log_error "Base backup restore failed" exit "$EXIT_BACKUP_ERROR" fi elif [[ "$base_backup_path" == *.lz4 ]]; then log_info "Decompressing lz4 backup..." if lz4 -dc "$base_backup_path" | psql "${psql_args[@]}" >/dev/null 2>&1; then log_success "Base backup restored successfully" else log_error "Base backup restore failed" exit "$EXIT_BACKUP_ERROR" fi else # Uncompressed backup if psql "${psql_args[@]}" -f "$base_backup_path" >/dev/null 2>&1; then log_success "Base backup restored successfully" else log_error "Base backup restore failed" exit "$EXIT_BACKUP_ERROR" fi fi # Apply sealed differentials if [[ ${#differential_files[@]} -gt 0 ]]; then log_step "Applying sealed differentials" local diff_count=0 for diff_file in "${differential_files[@]}"; do log_info "Applying: $(basename "$diff_file")" if psql "${psql_args[@]}" -f "$diff_file" >/dev/null 2>&1; then if [[ "$VERBOSE" -eq 1 ]]; then log_success "Applied: $(basename "$diff_file")" fi diff_count=$((diff_count + 1)) else log_error "Failed to apply differential: $(basename "$diff_file")" log_error "Restore is incomplete" exit "$EXIT_BACKUP_ERROR" fi done log_success "Applied $diff_count sealed differential(s)" fi # Apply active.sql if requested (WARNING: may be incomplete) if [[ "$INCLUDE_ACTIVE" -eq 1 ]] && [[ -f "$chain_dir/active.sql" ]]; then log_step "Applying active.sql (INCOMPLETE DATA WARNING)" log_warning "active.sql may contain incomplete transactions!" local psql_args mapfile -t psql_args < <(build_psql_args) if psql "${psql_args[@]}" -f "$chain_dir/active.sql" >/dev/null 2>&1; then log_warning "Applied incomplete active.sql - verify data integrity!" else log_error "Failed to apply active.sql" exit "$EXIT_BACKUP_ERROR" fi elif [[ "$INCLUDE_ACTIVE" -eq 1 ]]; then log_warning "No active.sql found in chain (--include-active was specified)" fi # Synchronize sequences if [[ "$NO_SYNC_SEQUENCES" -eq 0 ]]; then log_step "Synchronizing sequences" # Query all sequences and their associated tables local seq_sync_sql seq_sync_sql=$(query_db " SELECT 'SELECT setval(' || quote_literal(sn.nspname || '.' || s.relname) || ', GREATEST((SELECT COALESCE(MAX(' || quote_ident(a.attname) || '), 1) FROM ' || quote_ident(tn.nspname) || '.' || quote_ident(t.relname) || '), 1));' FROM pg_class s JOIN pg_namespace sn ON sn.oid = s.relnamespace JOIN pg_depend d ON d.objid = s.oid AND d.deptype = 'a' JOIN pg_class t ON t.oid = d.refobjid JOIN pg_namespace tn ON tn.oid = t.relnamespace JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = d.refobjsubid WHERE s.relkind = 'S' AND sn.nspname NOT IN ('pg_catalog', 'information_schema') ORDER BY sn.nspname, s.relname; " 2>/dev/null) if [[ -n "$seq_sync_sql" ]]; then local seq_count=0 while IFS= read -r sync_cmd; do if query_db_silent "$sync_cmd"; then seq_count=$((seq_count + 1)) if [[ "$VERBOSE" -eq 1 ]]; then log_info "Synced sequence: $(echo "$sync_cmd" | grep -oP "'\K[^']+(?=')")" fi else log_warning "Failed to sync sequence: $sync_cmd" fi done <<< "$seq_sync_sql" log_success "Synchronized $seq_count sequence(s)" else log_info "No sequences found to synchronize" fi else log_info "Skipping sequence synchronization (--no-sync-sequences specified)" fi # Calculate restore duration local end_time end_time=$(date +%s) local duration=$((end_time - start_time)) # Report statistics log_step "Restore Statistics" # Count rows in all tables log_info "Counting rows in restored tables..." local table_count table_count=$(query_db "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 'information_schema');" 2>/dev/null) local total_rows total_rows=$(query_db " SELECT COALESCE(SUM(n_live_tup), 0) FROM pg_stat_user_tables; " 2>/dev/null) echo -e "${BOLD}Database:${RESET} $DBNAME" >&2 echo -e "${BOLD}Chain:${RESET} $chain_id" >&2 echo -e "${BOLD}Tables Restored:${RESET} $table_count" >&2 echo -e "${BOLD}Total Rows:${RESET} $total_rows (approximate)" >&2 echo -e "${BOLD}Duration:${RESET} ${duration}s" >&2 echo -e "${BOLD}Base Backup:${RESET} $(basename "$base_backup_path")" >&2 if [[ ${#differential_files[@]} -gt 0 ]]; then echo -e "${BOLD}Differentials Applied:${RESET} ${#differential_files[@]}" >&2 fi if [[ "$INCLUDE_ACTIVE" -eq 1 ]] && [[ -f "$chain_dir/active.sql" ]]; then echo -e "${BOLD}Included active.sql:${RESET} ${YELLOW}YES (incomplete data)${RESET}" >&2 fi # Final success message echo >&2 log_step "Restore Complete" log_success "Database successfully restored to: $DBNAME" log_info "Next steps:" log_info " 1. Verify data integrity:" log_info " psql -d $DBNAME -c 'SELECT COUNT(*) FROM ;'" log_info " 2. Run application smoke tests" log_info " 3. Switch application to restored database" exit "$EXIT_SUCCESS" } # # --status command implementation # cmd_status() { log_step "Checking pg_scribe backup system status" # Validate required arguments validate_required_args "status" "DBNAME:-d/--dbname" # Test connection test_connection # Track warnings for exit code local has_warnings=0 # Check replication slot status log_step "Replication Slot Status" # Verify replication slot exists check_replication_slot "$SLOT" 1 # Query slot details local slot_info slot_info=$(query_db " SELECT slot_name, slot_type, database, active, restart_lsn, confirmed_flush_lsn, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as restart_lag_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as confirmed_lag_bytes, pg_current_wal_lsn() as current_lsn FROM pg_replication_slots WHERE slot_name = '$SLOT'; " | head -1) # Parse slot info IFS='|' read -r slot_name slot_type db_name active restart_lsn confirmed_flush_lsn restart_lag_bytes confirmed_lag_bytes current_lsn <<< "$slot_info" # Display slot information echo -e "${BOLD}Slot Name:${RESET} $slot_name" >&2 echo -e "${BOLD}Slot Type:${RESET} $slot_type" >&2 echo -e "${BOLD}Database:${RESET} $db_name" >&2 if [[ "$active" == "t" ]]; then echo -e "${BOLD}Active:${RESET} ${GREEN}Yes${RESET}" >&2 else echo -e "${BOLD}Active:${RESET} ${YELLOW}No${RESET}" >&2 log_warning "Replication slot is not active" has_warnings=1 fi echo -e "${BOLD}Current WAL LSN:${RESET} $current_lsn" >&2 echo -e "${BOLD}Restart LSN:${RESET} $restart_lsn" >&2 echo -e "${BOLD}Confirmed LSN:${RESET} $confirmed_flush_lsn" >&2 # Format lag in human-readable sizes local restart_lag_mb=$((restart_lag_bytes / 1024 / 1024)) local confirmed_lag_mb=$((confirmed_lag_bytes / 1024 / 1024)) # Check lag thresholds (based on design doc) if [[ "$restart_lag_bytes" -gt 10737418240 ]]; then # > 10GB - CRITICAL echo -e "${BOLD}Restart Lag:${RESET} ${RED}${restart_lag_mb} MB (CRITICAL!)${RESET}" >&2 log_error "CRITICAL: Replication lag exceeds 10GB!" log_error " This may cause disk space issues or database shutdown" log_error " Consider dropping the slot if backup collection has stopped" has_warnings=1 elif [[ "$restart_lag_bytes" -gt 1073741824 ]]; then # > 1GB - WARNING echo -e "${BOLD}Restart Lag:${RESET} ${YELLOW}${restart_lag_mb} MB (WARNING)${RESET}" >&2 log_warning "Replication lag exceeds 1GB" log_warning " Ensure backup collection is running and healthy" has_warnings=1 else echo -e "${BOLD}Restart Lag:${RESET} ${GREEN}${restart_lag_mb} MB${RESET}" >&2 fi if [[ "$confirmed_lag_bytes" -gt 10737418240 ]]; then echo -e "${BOLD}Confirmed Lag:${RESET} ${RED}${confirmed_lag_mb} MB (CRITICAL!)${RESET}" >&2 has_warnings=1 elif [[ "$confirmed_lag_bytes" -gt 1073741824 ]]; then echo -e "${BOLD}Confirmed Lag:${RESET} ${YELLOW}${confirmed_lag_mb} MB (WARNING)${RESET}" >&2 has_warnings=1 else echo -e "${BOLD}Confirmed Lag:${RESET} ${GREEN}${confirmed_lag_mb} MB${RESET}" >&2 fi # Check slot age (if we can determine it) # Note: pg_replication_slots doesn't directly track creation time, but we can estimate from WAL echo >&2 # Analyze backup directory if provided if [[ -n "$BACKUP_DIR" ]]; then log_step "Chain Inventory" if [[ ! -d "$BACKUP_DIR" ]]; then log_warning "Backup directory does not exist: $BACKUP_DIR" has_warnings=1 else # Find all chains local chains mapfile -t chains < <(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | sort) if [[ ${#chains[@]} -eq 0 ]]; then log_warning "No chains found in backup directory" log_warning " Run --init to create the initial chain" has_warnings=1 else echo -e "${BOLD}Backup Directory:${RESET} $BACKUP_DIR" >&2 echo "" >&2 # Determine which chain is active local pidfile="$BACKUP_DIR/.pg_scribe.pid" local active_chain_id="" local active_pid="" if [[ -f "$pidfile" ]]; then local pid pid=$(cat "$pidfile") if kill -0 "$pid" 2>/dev/null; then # Find which chain has active.sql local active_file active_file=$(find "$BACKUP_DIR"/chain-*/active.sql 2>/dev/null | head -1) if [[ -n "$active_file" ]]; then active_chain_id=$(basename "$(dirname "$active_file")" | sed 's/^chain-//') active_pid="$pid" fi fi fi # Display each chain for chain_dir in "${chains[@]}"; do local chain_id chain_id=$(basename "$chain_dir" | sed 's/^chain-//') # Gather chain info local base_backup="$chain_dir/base.sql" local base_size="" # Check for compressed variants if [[ -f "$base_backup" ]]; then base_size=$(get_file_size "$base_backup") elif [[ -f "$base_backup.gz" ]]; then base_size=$(get_file_size "$base_backup.gz") elif [[ -f "$base_backup.lz4" ]]; then base_size=$(get_file_size "$base_backup.lz4") elif [[ -f "$base_backup.zst" ]]; then base_size=$(get_file_size "$base_backup.zst") fi local diff_count diff_count=$(find "$chain_dir" -maxdepth 1 -name 'diff-*.sql' 2>/dev/null | wc -l) local total_size total_size=$(du -sh "$chain_dir" 2>/dev/null | cut -f1) # Check if this chain is active if [[ "$chain_id" == "$active_chain_id" ]]; then echo -e " ${GREEN}chain-$chain_id${RESET} ${BOLD}(ACTIVE - streaming)${RESET}" >&2 echo -e " ${BOLD}PID:${RESET} $active_pid" >&2 else echo -e " chain-$chain_id" >&2 fi echo -e " ${BOLD}Base backup:${RESET} $base_size" >&2 echo -e " ${BOLD}Differentials:${RESET} $diff_count sealed" >&2 echo -e " ${BOLD}Total size:${RESET} $total_size" >&2 # Show last activity if active.sql exists if [[ -f "$chain_dir/active.sql" ]]; then local last_mod last_mod=$(stat -c %y "$chain_dir/active.sql" 2>/dev/null | cut -d. -f1) local age_seconds age_seconds=$(( $(date +%s) - $(stat -c %Y "$chain_dir/active.sql" 2>/dev/null) )) local age_minutes=$((age_seconds / 60)) echo -e " ${BOLD}Last activity:${RESET} $last_mod ($age_minutes minutes ago)" >&2 # Warn if last activity is old (only for active chain) if [[ "$chain_id" == "$active_chain_id" ]] && [[ "$age_minutes" -gt 60 ]]; then log_warning "Active chain has no activity for ${age_minutes} minutes" log_warning " Verify that streaming is working correctly" has_warnings=1 fi fi echo "" >&2 done # Calculate total backup directory size local total_size total_size=$(du -sh "$BACKUP_DIR" 2>/dev/null | cut -f1) echo -e "${BOLD}Total Backup Size:${RESET} $total_size" >&2 fi fi fi # Overall health summary echo >&2 log_step "Health Summary" if [[ "$has_warnings" -eq 0 ]]; then log_success "System is healthy" echo >&2 log_info "Replication slot is active and lag is acceptable" if [[ -n "$BACKUP_DIR" ]]; then log_info "Backup directory appears healthy" fi exit "$EXIT_SUCCESS" else log_warning "System has warnings - review messages above" echo >&2 log_info "Address any CRITICAL or WARNING issues promptly" log_info "See design doc for monitoring recommendations" exit "$EXIT_WARNING" fi } # Main entry point main() { parse_args "$@" case "$ACTION" in init) cmd_init ;; start) cmd_start ;; rotate-diff) cmd_rotate_diff ;; new-chain) cmd_new_chain ;; restore) cmd_restore ;; status) cmd_status ;; *) log_error "Unknown action: $ACTION" exit "$EXIT_GENERAL_ERROR" ;; esac } # Run main with all arguments main "$@"