]> begriffs open source - pg_scribe/blob - scripts/pg_scribe
Apply diffs faster with synchronous_commit = off
[pg_scribe] / scripts / pg_scribe
1 #!/usr/bin/env bash
2
3 # pg_scribe - Incremental SQL backup system for PostgreSQL
4 #
5 # This script provides a unified CLI for managing PostgreSQL backups
6 # using logical replication and plain SQL format.
7
8 set -euo pipefail
9
10 # Version
11 VERSION="0.1.0"
12
13 # Exit codes
14 EXIT_SUCCESS=0
15 EXIT_GENERAL_ERROR=1
16 EXIT_CONNECTION_ERROR=2
17 EXIT_SLOT_ERROR=3
18 EXIT_BACKUP_ERROR=4
19 EXIT_VALIDATION_ERROR=5
20 EXIT_WARNING=10
21
22 # Default values
23 DEFAULT_SLOT="pg_scribe"
24 DEFAULT_PORT="5432"
25 DEFAULT_HOST="localhost"
26 DEFAULT_STATUS_INTERVAL=10
27 DEFAULT_FSYNC_INTERVAL=10
28
29 # Global variables
30 ACTION=""
31 DBNAME=""
32 HOST="${PGHOST:-$DEFAULT_HOST}"
33 PORT="${PGPORT:-$DEFAULT_PORT}"
34 USERNAME="${PGUSER:-${USER:-}}"
35 BACKUP_DIR=""
36 SLOT="$DEFAULT_SLOT"
37 STATUS_INTERVAL="$DEFAULT_STATUS_INTERVAL"
38 FSYNC_INTERVAL="$DEFAULT_FSYNC_INTERVAL"
39 COMPRESS=""
40 CREATE_DB=0
41 BASE_BACKUP=""
42 NO_SYNC_SEQUENCES=0
43 INCLUDE_ACTIVE=0
44 NO_PASSWORD=0
45 FORCE_PASSWORD=0
46 VERBOSE=0
47 FORCE=0
48 IF_NOT_EXISTS=0
49 AUTO_START=0
50
51 # Color output support
52 if [[ "${PG_COLOR:-auto}" == "always" ]] || [[ "${PG_COLOR:-auto}" == "auto" && -t 2 ]]; then
53     RED='\033[0;31m'
54     GREEN='\033[0;32m'
55     YELLOW='\033[1;33m'
56     BLUE='\033[0;34m'
57     BOLD='\033[1m'
58     RESET='\033[0m'
59 else
60     RED=''
61     GREEN=''
62     YELLOW=''
63     BLUE=''
64     BOLD=''
65     RESET=''
66 fi
67
68 # Logging functions (output to stderr)
69 log_info() {
70     echo -e "${BLUE}INFO:${RESET} $*" >&2
71 }
72
73 log_success() {
74     echo -e "${GREEN}SUCCESS:${RESET} $*" >&2
75 }
76
77 log_warning() {
78     echo -e "${YELLOW}WARNING:${RESET} $*" >&2
79 }
80
81 log_error() {
82     echo -e "${RED}ERROR:${RESET} $*" >&2
83 }
84
85 log_step() {
86     echo -e "${BOLD}==>${RESET} $*" >&2
87 }
88
89 # Usage information
90 usage() {
91     cat <<EOF
92 pg_scribe - Incremental SQL backup system for PostgreSQL
93
94 Usage:
95   pg_scribe --init [OPTIONS]
96   pg_scribe --start [OPTIONS]
97   pg_scribe --rotate-diff [OPTIONS]
98   pg_scribe --new-chain [OPTIONS]
99   pg_scribe --restore [OPTIONS]
100   pg_scribe --status [OPTIONS]
101   pg_scribe --version
102   pg_scribe --help
103
104 Actions (exactly one required):
105   --init                Initialize backup system (create first chain)
106   --start               Start streaming incremental backups
107   --stop                Stop active streaming process
108   --rotate-diff         Rotate differential file within active chain
109   --new-chain           Create new chain with fresh base backup
110   --restore             Restore from backups
111   --status              Check replication slot and chain inventory
112   -V, --version         Print version and exit
113   -?, --help            Show this help and exit
114
115 Connection Options:
116   -d, --dbname=DBNAME   Database name (can be connection string)
117   -h, --host=HOSTNAME   Database server host (default: $DEFAULT_HOST)
118   -p, --port=PORT       Database server port (default: $DEFAULT_PORT)
119   -U, --username=NAME   Database user (default: \$PGUSER or \$USER)
120   -w, --no-password     Never prompt for password
121   -W, --password        Force password prompt
122
123 General Options:
124   -v, --verbose         Enable verbose mode
125
126 Options for --init:
127   -f, --backup-dir=DIR  Backup output directory (required)
128   -S, --slot=SLOTNAME   Replication slot name (default: $DEFAULT_SLOT)
129   --if-not-exists       Do not error if backup directory already initialized
130   --force               Skip validation and force initialization
131
132 Options for --start:
133   -f, --backup-dir=DIR  Backup directory containing chains (required)
134   -s, --status-interval=SECS   Status update interval (default: $DEFAULT_STATUS_INTERVAL)
135   -F, --fsync-interval=SECS    Fsync interval (default: $DEFAULT_FSYNC_INTERVAL, 0 to disable)
136   Note: Replication slot is read from chain metadata
137
138 Options for --stop:
139   -f, --backup-dir=DIR  Backup directory containing active process (required)
140
141 Options for --rotate-diff:
142   -f, --backup-dir=DIR  Backup directory containing active chain (required)
143
144 Options for --new-chain:
145   -d, --dbname=DBNAME   Database name (required)
146   -f, --backup-dir=DIR  Backup directory for new chain (required)
147   -S, --slot=SLOTNAME   Replication slot name (default: $DEFAULT_SLOT)
148   -Z, --compress=METHOD Compression: gzip, lz4, zstd, or none (default: none)
149   --start               Stop old streaming and start streaming to new chain
150
151 Options for --restore:
152   -f, --backup-dir=DIR  Backup directory containing chains (required)
153   -d, --dbname=DBNAME   Target database name (required)
154   -C, --create          Create target database
155   --base-backup=ID      Specific chain ID to restore (default: latest)
156   --include-active      Include active.sql (risky - may have incomplete data)
157   --no-sync-sequences   Skip sequence synchronization
158
159 Options for --status:
160   -S, --slot=SLOTNAME   Replication slot name (default: $DEFAULT_SLOT)
161   -f, --backup-dir=DIR  Backup directory to analyze (optional)
162
163 Exit Status:
164   0   Success
165   1   General error
166   2   Database connection error
167   3   Replication slot error
168   4   Backup/restore error
169   5   Invalid arguments or validation failure
170
171 Environment Variables:
172   PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD, PG_COLOR
173
174 Report bugs to: https://github.com/your-repo/pg_scribe/issues
175 EOF
176 }
177
178 # Parse command line arguments
179 parse_args() {
180     if [[ $# -eq 0 ]]; then
181         usage
182         exit "$EXIT_VALIDATION_ERROR"
183     fi
184
185     while [[ $# -gt 0 ]]; do
186         case "$1" in
187             --init)
188                 [[ -n "$ACTION" ]] && { log_error "Multiple action flags specified"; exit "$EXIT_VALIDATION_ERROR"; }
189                 ACTION="init"
190                 shift
191                 ;;
192             --start)
193                 # Can be either an action or a modifier for --new-chain
194                 if [[ -z "$ACTION" ]]; then
195                     ACTION="start"
196                 elif [[ "$ACTION" == "new-chain" ]]; then
197                     AUTO_START=1
198                 else
199                     log_error "Multiple action flags specified"
200                     exit "$EXIT_VALIDATION_ERROR"
201                 fi
202                 shift
203                 ;;
204             --stop)
205                 [[ -n "$ACTION" ]] && { log_error "Multiple action flags specified"; exit "$EXIT_VALIDATION_ERROR"; }
206                 ACTION="stop"
207                 shift
208                 ;;
209             --rotate-diff)
210                 [[ -n "$ACTION" ]] && { log_error "Multiple action flags specified"; exit "$EXIT_VALIDATION_ERROR"; }
211                 ACTION="rotate-diff"
212                 shift
213                 ;;
214             --new-chain)
215                 if [[ -z "$ACTION" ]]; then
216                     ACTION="new-chain"
217                 elif [[ "$ACTION" == "start" ]]; then
218                     ACTION="new-chain"
219                     AUTO_START=1
220                 else
221                     log_error "Multiple action flags specified"
222                     exit "$EXIT_VALIDATION_ERROR"
223                 fi
224                 shift
225                 ;;
226             --restore)
227                 [[ -n "$ACTION" ]] && { log_error "Multiple action flags specified"; exit "$EXIT_VALIDATION_ERROR"; }
228                 ACTION="restore"
229                 shift
230                 ;;
231             --status)
232                 [[ -n "$ACTION" ]] && { log_error "Multiple action flags specified"; exit "$EXIT_VALIDATION_ERROR"; }
233                 ACTION="status"
234                 shift
235                 ;;
236             -V|--version)
237                 echo "pg_scribe $VERSION"
238                 exit "$EXIT_SUCCESS"
239                 ;;
240             -\?|--help)
241                 usage
242                 exit "$EXIT_SUCCESS"
243                 ;;
244             -d|--dbname)
245                 DBNAME="$2"
246                 shift 2
247                 ;;
248             --dbname=*)
249                 DBNAME="${1#*=}"
250                 shift
251                 ;;
252             -h|--host)
253                 HOST="$2"
254                 shift 2
255                 ;;
256             --host=*)
257                 HOST="${1#*=}"
258                 shift
259                 ;;
260             -p|--port)
261                 PORT="$2"
262                 shift 2
263                 ;;
264             --port=*)
265                 PORT="${1#*=}"
266                 shift
267                 ;;
268             -U|--username)
269                 USERNAME="$2"
270                 shift 2
271                 ;;
272             --username=*)
273                 USERNAME="${1#*=}"
274                 shift
275                 ;;
276             -f|--file|--backup-dir)
277                 BACKUP_DIR="$2"
278                 shift 2
279                 ;;
280             --file=*|--backup-dir=*)
281                 BACKUP_DIR="${1#*=}"
282                 shift
283                 ;;
284             -S|--slot)
285                 SLOT="$2"
286                 shift 2
287                 ;;
288             --slot=*)
289                 SLOT="${1#*=}"
290                 shift
291                 ;;
292             -s|--status-interval)
293                 STATUS_INTERVAL="$2"
294                 shift 2
295                 ;;
296             --status-interval=*)
297                 STATUS_INTERVAL="${1#*=}"
298                 shift
299                 ;;
300             -F|--fsync-interval)
301                 FSYNC_INTERVAL="$2"
302                 shift 2
303                 ;;
304             --fsync-interval=*)
305                 FSYNC_INTERVAL="${1#*=}"
306                 shift
307                 ;;
308             -Z|--compress)
309                 COMPRESS="$2"
310                 shift 2
311                 ;;
312             --compress=*)
313                 COMPRESS="${1#*=}"
314                 shift
315                 ;;
316             -C|--create)
317                 CREATE_DB=1
318                 shift
319                 ;;
320             --base-backup)
321                 BASE_BACKUP="$2"
322                 shift 2
323                 ;;
324             --base-backup=*)
325                 BASE_BACKUP="${1#*=}"
326                 shift
327                 ;;
328             --no-sync-sequences)
329                 NO_SYNC_SEQUENCES=1
330                 shift
331                 ;;
332             --include-active)
333                 INCLUDE_ACTIVE=1
334                 shift
335                 ;;
336             -w|--no-password)
337                 NO_PASSWORD=1
338                 shift
339                 ;;
340             -W|--password)
341                 FORCE_PASSWORD=1
342                 shift
343                 ;;
344             -v|--verbose)
345                 VERBOSE=1
346                 shift
347                 ;;
348             --force)
349                 FORCE=1
350                 shift
351                 ;;
352             --if-not-exists)
353                 IF_NOT_EXISTS=1
354                 shift
355                 ;;
356             *)
357                 log_error "Unknown option: $1"
358                 usage
359                 exit "$EXIT_VALIDATION_ERROR"
360                 ;;
361         esac
362     done
363
364     # Validate action was specified
365     if [[ -z "$ACTION" ]]; then
366         log_error "No action specified"
367         usage
368         exit "$EXIT_VALIDATION_ERROR"
369     fi
370
371     # Use PGDATABASE if dbname not specified
372     if [[ -z "$DBNAME" && -n "${PGDATABASE:-}" ]]; then
373         DBNAME="$PGDATABASE"
374     fi
375 }
376
377 # Build psql connection string
378 build_psql_args() {
379     local args=()
380
381     [[ -n "$DBNAME" ]] && args+=(-d "$DBNAME")
382     [[ -n "$HOST" ]] && args+=(-h "$HOST")
383     [[ -n "$PORT" ]] && args+=(-p "$PORT")
384     [[ -n "$USERNAME" ]] && args+=(-U "$USERNAME")
385     [[ "$NO_PASSWORD" -eq 1 ]] && args+=(-w)
386     [[ "$FORCE_PASSWORD" -eq 1 ]] && args+=(-W)
387
388     printf '%s\n' "${args[@]}"
389 }
390
391 # Build pg_recvlogical connection string
392 build_pg_recvlogical_args() {
393     local args=()
394
395     [[ -n "$DBNAME" ]] && args+=(-d "$DBNAME")
396     [[ -n "$HOST" ]] && args+=(-h "$HOST")
397     [[ -n "$PORT" ]] && args+=(-p "$PORT")
398     [[ -n "$USERNAME" ]] && args+=(-U "$USERNAME")
399     [[ "$NO_PASSWORD" -eq 1 ]] && args+=(-w)
400     [[ "$FORCE_PASSWORD" -eq 1 ]] && args+=(-W)
401
402     printf '%s\n' "${args[@]}"
403 }
404
405 # Build pg_dumpall connection arguments (no -d flag)
406 build_pg_dumpall_args() {
407     local args=()
408
409     [[ -n "$HOST" ]] && args+=(-h "$HOST")
410     [[ -n "$PORT" ]] && args+=(-p "$PORT")
411     [[ -n "$USERNAME" ]] && args+=(-U "$USERNAME")
412     [[ "$NO_PASSWORD" -eq 1 ]] && args+=(-w)
413     [[ "$FORCE_PASSWORD" -eq 1 ]] && args+=(-W)
414
415     printf '%s\n' "${args[@]}"
416 }
417
418 # Generate chain ID in ISO 8601 format (UTC, sortable)
419 get_chain_id() {
420     date -u +%Y%m%dT%H%M%SZ
421 }
422
423 # Get human-readable file size
424 # Arguments:
425 #   $1 - file path
426 # Returns:
427 #   Echoes the file size in human-readable format (e.g., "1.2M", "5.4K")
428 get_file_size() {
429     local file_path="$1"
430     du -h "$file_path" 2>/dev/null | cut -f1
431 }
432
433 # Test database connection
434 test_connection() {
435     log_step "Testing database connection..."
436
437     local psql_args
438     mapfile -t psql_args < <(build_psql_args)
439
440     if ! psql "${psql_args[@]}" -c "SELECT version();" >/dev/null 2>&1; then
441         log_error "Failed to connect to database"
442         log_error "Connection details: host=$HOST port=$PORT dbname=$DBNAME user=$USERNAME"
443         exit "$EXIT_CONNECTION_ERROR"
444     fi
445
446     if [[ "$VERBOSE" -eq 1 ]]; then
447         log_success "Connected to database"
448     fi
449 }
450
451 # Execute SQL query and return result
452 query_db() {
453     local sql="$1"
454     local psql_args
455     mapfile -t psql_args < <(build_psql_args)
456     psql "${psql_args[@]}" -t -A -c "$sql" 2>&1
457 }
458
459 # Execute SQL query silently (return exit code only)
460 query_db_silent() {
461     local sql="$1"
462     local psql_args
463     mapfile -t psql_args < <(build_psql_args)
464     psql "${psql_args[@]}" -t -A -c "$sql" >/dev/null 2>&1
465 }
466
467 # Take a globals backup (roles, tablespaces, etc.)
468 # Arguments:
469 #   $1 - chain directory path
470 # Returns:
471 #   Echoes the path to the created globals backup file
472 #   Exits script on failure
473 take_globals_backup() {
474     local chain_dir="$1"
475     local globals_backup_file="$chain_dir/globals.sql"
476
477     log_info "Taking globals backup..."
478
479     # Build pg_dumpall connection arguments
480     local dumpall_args
481     mapfile -t dumpall_args < <(build_pg_dumpall_args)
482
483     # Add globals-only flag and output file
484     dumpall_args+=(--globals-only)
485     dumpall_args+=(--file="$globals_backup_file")
486
487     if pg_dumpall "${dumpall_args[@]}"; then
488         local globals_size
489         globals_size=$(get_file_size "$globals_backup_file")
490         log_success "Globals backup completed ($globals_size)"
491         echo "$globals_backup_file"
492     else
493         log_error "Globals backup failed"
494         # Clean up partial file
495         rm -f "$globals_backup_file" 2>/dev/null || true
496         exit "$EXIT_BACKUP_ERROR"
497     fi
498 }
499
500 # Validate required arguments for a command
501 # Arguments: command_name arg_name:description [arg_name:description ...]
502 # Example: validate_required_args "init" "DBNAME:database" "BACKUP_DIR:backup directory"
503 validate_required_args() {
504     local command_name="$1"
505     shift
506
507     local validation_failed=0
508
509     for arg_spec in "$@"; do
510         local arg_name="${arg_spec%%:*}"
511         local arg_description="${arg_spec#*:}"
512
513         # Use indirect variable reference to check if argument is set
514         if [[ -z "${!arg_name}" ]]; then
515             log_error "--${command_name} requires ${arg_description}"
516             validation_failed=1
517         fi
518     done
519
520     if [[ "$validation_failed" -eq 1 ]]; then
521         exit "$EXIT_VALIDATION_ERROR"
522     fi
523 }
524
525 # Check replication slot existence
526 # Arguments:
527 #   $1 - slot name
528 #   $2 - should_exist: 1 if slot should exist, 0 if slot should NOT exist
529 # Exits with appropriate error code if expectation is not met
530 check_replication_slot() {
531     local slot_name="$1"
532     local should_exist="$2"
533
534     local slot_exists
535     slot_exists=$(query_db "SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slot_name';")
536
537     if [[ "$should_exist" -eq 0 ]]; then
538         # Slot should NOT exist
539         if [[ "$slot_exists" -gt 0 ]]; then
540             log_error "Replication slot '$slot_name' already exists"
541             log_error ""
542             log_error "A replication slot with this name already exists in the database."
543             log_error "This may indicate:"
544             log_error "  - A previous initialization that was not cleaned up"
545             log_error "  - Another pg_scribe instance using the same slot name"
546             log_error ""
547             log_error "To resolve:"
548             log_error "  - Use a different slot name with -S/--slot option"
549             log_error "  - Or drop the existing slot (if safe):"
550             log_error "    psql -d $DBNAME -c \"SELECT pg_drop_replication_slot('$slot_name');\""
551             exit "$EXIT_SLOT_ERROR"
552         fi
553     else
554         # Slot should exist
555         if [[ "$slot_exists" -eq 0 ]]; then
556             log_error "Replication slot '$slot_name' does not exist"
557             log_error ""
558             log_error "You must initialize the backup system first:"
559             log_error "  pg_scribe --init -d $DBNAME -f <backup_dir> -S $slot_name"
560             log_error ""
561             log_error "Or verify the slot name is correct with:"
562             log_error "  psql -d $DBNAME -c \"SELECT slot_name FROM pg_replication_slots;\""
563             exit "$EXIT_SLOT_ERROR"
564         fi
565         log_success "Replication slot '$slot_name' found"
566     fi
567 }
568
569 #
570 # --init command implementation
571 #
572 cmd_init() {
573     log_step "Initializing pg_scribe backup system"
574
575     # Validate required arguments
576     validate_required_args "init" "DBNAME:-d/--dbname" "BACKUP_DIR:-f/--file (backup directory)"
577
578     # Cleanup tracking for failure handling
579     local CREATED_SLOT=""
580     local CREATED_FILES=()
581
582     # Cleanup function for handling failures
583     # shellcheck disable=SC2317  # Function called via trap handler
584     cleanup_on_failure() {
585         local exit_code=$?
586
587         # Only cleanup on actual failure, not on successful exit
588         if [[ $exit_code -ne 0 && $exit_code -ne $EXIT_WARNING ]]; then
589             log_info "Cleaning up after failed initialization..."
590
591             # Drop replication slot if we created it
592             if [[ -n "$CREATED_SLOT" ]]; then
593                 log_info "Dropping replication slot '$CREATED_SLOT'..."
594                 query_db "SELECT pg_drop_replication_slot('$CREATED_SLOT');" 2>/dev/null || true
595             fi
596
597             # Remove files we created
598             for file in "${CREATED_FILES[@]}"; do
599                 if [[ -f "$file" ]]; then
600                     log_info "Removing partial file: $file"
601                     rm -f "$file" 2>/dev/null || true
602                 fi
603             done
604
605             log_info "Cleanup complete"
606         fi
607     }
608
609     # Set up cleanup trap
610     trap cleanup_on_failure EXIT INT TERM
611
612     # Test connection first
613     test_connection
614
615     # Phase 1: Validation
616     log_step "Phase 1: Validation"
617
618     local validation_failed=0
619     local has_warnings=0
620
621     # Check wal_level
622     log_info "Checking wal_level configuration..."
623     local wal_level
624     wal_level=$(query_db "SHOW wal_level;")
625     if [[ "$wal_level" != "logical" ]]; then
626         log_error "CRITICAL: wal_level is '$wal_level', must be 'logical'"
627         log_error "  Fix: Add 'wal_level = logical' to postgresql.conf and restart PostgreSQL"
628         validation_failed=1
629     else
630         if [[ "$VERBOSE" -eq 1 ]]; then
631             log_success "wal_level = logical"
632         fi
633     fi
634
635     # Check max_replication_slots
636     log_info "Checking max_replication_slots configuration..."
637     local max_slots
638     max_slots=$(query_db "SHOW max_replication_slots;")
639     if [[ "$max_slots" -lt 1 ]]; then
640         log_error "CRITICAL: max_replication_slots is $max_slots, must be >= 1"
641         log_error "  Fix: Add 'max_replication_slots = 10' to postgresql.conf and restart PostgreSQL"
642         validation_failed=1
643     else
644         if [[ "$VERBOSE" -eq 1 ]]; then
645             log_success "max_replication_slots = $max_slots"
646         fi
647     fi
648
649     # Check max_wal_senders
650     log_info "Checking max_wal_senders configuration..."
651     local max_senders
652     max_senders=$(query_db "SHOW max_wal_senders;")
653     if [[ "$max_senders" -lt 1 ]]; then
654         log_error "CRITICAL: max_wal_senders is $max_senders, must be >= 1"
655         log_error "  Fix: Add 'max_wal_senders = 10' to postgresql.conf and restart PostgreSQL"
656         validation_failed=1
657     else
658         if [[ "$VERBOSE" -eq 1 ]]; then
659             log_success "max_wal_senders = $max_senders"
660         fi
661     fi
662
663     # Check replica identity on all tables
664     log_info "Checking replica identity for all tables..."
665     local bad_tables
666     bad_tables=$(query_db "
667         SELECT n.nspname || '.' || c.relname
668         FROM pg_class c
669         JOIN pg_namespace n ON n.oid = c.relnamespace
670         WHERE c.relkind = 'r'
671           AND n.nspname NOT IN ('pg_catalog', 'information_schema')
672           AND c.relreplident IN ('d', 'n')
673           AND NOT EXISTS (
674               SELECT 1 FROM pg_index i
675               WHERE i.indrelid = c.oid AND i.indisprimary
676           )
677         ORDER BY n.nspname, c.relname;
678     ")
679
680     if [[ -n "$bad_tables" ]]; then
681         log_error "CRITICAL: The following tables lack adequate replica identity:"
682         while IFS= read -r table; do
683             log_error "  - $table"
684         done <<< "$bad_tables"
685         log_error "  Fix: Add a primary key or set replica identity:"
686         log_error "    ALTER TABLE <table> ADD PRIMARY KEY (id);"
687         log_error "    -- OR --"
688         log_error "    ALTER TABLE <table> REPLICA IDENTITY FULL;"
689         validation_failed=1
690     else
691         if [[ "$VERBOSE" -eq 1 ]]; then
692             log_success "All tables have adequate replica identity"
693         fi
694     fi
695
696     # Warning: Check for unlogged tables
697     log_info "Checking for unlogged tables..."
698     local unlogged_tables
699     unlogged_tables=$(query_db "
700         SELECT n.nspname || '.' || c.relname
701         FROM pg_class c
702         JOIN pg_namespace n ON n.oid = c.relnamespace
703         WHERE c.relkind = 'r'
704           AND c.relpersistence = 'u'
705           AND n.nspname NOT IN ('pg_catalog', 'information_schema')
706         ORDER BY n.nspname, c.relname;
707     ")
708
709     if [[ -n "$unlogged_tables" ]]; then
710         log_warning "The following unlogged tables will NOT be backed up:"
711         while IFS= read -r table; do
712             log_warning "  - $table"
713         done <<< "$unlogged_tables"
714         has_warnings=1
715     fi
716
717     # Warning: Check for large objects
718     log_info "Checking for large objects..."
719     local large_object_count
720     large_object_count=$(query_db "SELECT count(*) FROM pg_largeobject_metadata;")
721
722     if [[ "$large_object_count" -gt 0 ]]; then
723         log_warning "Database contains $large_object_count large objects"
724         log_warning "Large objects are NOT incrementally backed up (only in full backups)"
725         log_warning "Consider using BYTEA columns instead for incremental backup support"
726         has_warnings=1
727     fi
728
729     # Check if validation failed
730     if [[ "$validation_failed" -eq 1 ]]; then
731         if [[ "$FORCE" -eq 1 ]]; then
732             log_warning "Validation failed but --force specified, continuing anyway..."
733         else
734             log_error "Validation failed. Fix the CRITICAL issues above and try again."
735             log_error "Or use --force to skip validation (NOT recommended)."
736             exit "$EXIT_VALIDATION_ERROR"
737         fi
738     else
739         log_success "All validation checks passed"
740     fi
741
742     # Phase 2: Setup
743     log_step "Phase 2: Setup"
744
745     # Create backup directory
746     log_info "Checking backup directory..."
747     if [[ ! -d "$BACKUP_DIR" ]]; then
748         if ! mkdir -p "$BACKUP_DIR"; then
749             log_error "Failed to create backup directory: $BACKUP_DIR"
750             exit "$EXIT_BACKUP_ERROR"
751         fi
752         log_success "Created backup directory: $BACKUP_DIR"
753     else
754         # Directory exists - check if already initialized (has chains)
755         local existing_chains
756         existing_chains=$(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | head -1)
757
758         if [[ -n "$existing_chains" ]]; then
759             if [[ "$IF_NOT_EXISTS" -eq 1 ]]; then
760                 log_info "Backup directory already initialized (--if-not-exists specified)"
761                 log_info "Skipping initialization"
762                 exit "$EXIT_SUCCESS"
763             else
764                 log_error "Backup directory already initialized: $BACKUP_DIR"
765                 log_error "Found existing chain(s)"
766                 log_error ""
767                 log_error "This directory has already been initialized with pg_scribe."
768                 log_error "To create a new chain, use: pg_scribe --new-chain"
769                 log_error ""
770                 log_error "If you want to re-initialize from scratch:"
771                 log_error "  1. Stop any running backup processes"
772                 log_error "  2. Drop the replication slot (or verify it's safe to reuse)"
773                 log_error "  3. Remove or rename the existing backup directory"
774                 exit "$EXIT_VALIDATION_ERROR"
775             fi
776         fi
777
778         log_info "Using existing directory: $BACKUP_DIR"
779     fi
780
781     # Generate chain ID and create chain directory
782     local chain_id
783     chain_id=$(get_chain_id)
784     local chain_dir="$BACKUP_DIR/chain-$chain_id"
785
786     log_info "Creating initial chain: $chain_id"
787     if ! mkdir -p "$chain_dir"; then
788         log_error "Failed to create chain directory: $chain_dir"
789         exit "$EXIT_BACKUP_ERROR"
790     fi
791
792     # Create wal2sql extension
793     log_info "Creating wal2sql extension..."
794     if query_db_silent "CREATE EXTENSION IF NOT EXISTS wal2sql;"; then
795         log_success "wal2sql extension created (or already exists)"
796     else
797         log_error "Failed to create wal2sql extension"
798         log_error "Ensure wal2sql.so is installed in PostgreSQL's lib directory"
799         log_error "Run: cd wal2sql && make && make install"
800         exit "$EXIT_GENERAL_ERROR"
801     fi
802
803     # Create replication slot with snapshot export
804     log_info "Creating logical replication slot '$SLOT'..."
805
806     # Check if slot already exists
807     check_replication_slot "$SLOT" 0
808
809     # Create slot using SQL
810     # Note: For POC, we create the slot and take the base backup sequentially
811     # The slot will preserve WAL from its creation LSN forward, ensuring no changes are lost
812     local slot_result
813     if ! slot_result=$(query_db "SELECT slot_name, lsn FROM pg_create_logical_replication_slot('$SLOT', 'wal2sql');"); then
814         log_error "Failed to create replication slot"
815         log_error "$slot_result"
816         exit "$EXIT_SLOT_ERROR"
817     fi
818
819     CREATED_SLOT="$SLOT"  # Track for cleanup
820     log_success "Replication slot '$SLOT' created"
821
822     # Take base backup immediately after slot creation
823     # The slot preserves WAL from its creation point, so all changes will be captured
824     local base_backup_file="$chain_dir/base.sql"
825     CREATED_FILES+=("$base_backup_file")  # Track for cleanup
826     log_info "Taking base backup..."
827
828     local psql_args
829     mapfile -t psql_args < <(build_psql_args)
830     if pg_dump "${psql_args[@]}" --file="$base_backup_file"; then
831         local base_size
832         base_size=$(get_file_size "$base_backup_file")
833         log_success "Base backup completed ($base_size)"
834     else
835         log_error "Base backup failed"
836         exit "$EXIT_BACKUP_ERROR"
837     fi
838
839     # Take globals backup
840     local globals_backup_file
841     globals_backup_file=$(take_globals_backup "$chain_dir")
842     CREATED_FILES+=("$globals_backup_file")  # Track for cleanup
843
844     # Generate metadata file
845     log_info "Generating metadata file..."
846     local metadata_file="$chain_dir/metadata.json"
847     CREATED_FILES+=("$metadata_file")  # Track for cleanup
848     local pg_version
849     pg_version=$(query_db "SELECT version();")
850
851     cat > "$metadata_file" <<EOF
852 {
853   "chain_id": "$chain_id",
854   "created": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")",
855   "pg_scribe_version": "$VERSION",
856   "database": "$DBNAME",
857   "replication_slot": "$SLOT",
858   "postgresql_version": "$pg_version",
859   "encoding": "$(query_db "SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname = '$DBNAME';")",
860   "collation": "$(query_db "SELECT datcollate FROM pg_database WHERE datname = '$DBNAME';")"
861 }
862 EOF
863
864     log_success "Metadata file created"
865
866     # Disable cleanup trap on successful completion
867     trap - EXIT INT TERM
868
869     # Final summary
870     echo >&2
871     log_step "Initialization Complete"
872     log_success "Initial chain created: $chain_id"
873     log_success "Location: $chain_dir"
874     log_success "Replication slot: $SLOT"
875     log_info "Next steps:"
876     log_info "  1. Start streaming incremental backups:"
877     log_info "     pg_scribe --start -d $DBNAME -f $BACKUP_DIR"
878     log_info "  2. Monitor replication slot health:"
879     log_info "     pg_scribe --status -d $DBNAME -S $SLOT -f $BACKUP_DIR"
880
881     if [[ "$has_warnings" -eq 1 ]]; then
882         exit "$EXIT_WARNING"
883     else
884         exit "$EXIT_SUCCESS"
885     fi
886 }
887
888 #
889 # --start command implementation
890 #
891 cmd_start() {
892     log_step "Starting incremental backup collection"
893
894     # Validate required arguments
895     validate_required_args "start" "DBNAME:-d/--dbname" "BACKUP_DIR:-f/--file (backup directory)"
896
897     # Verify backup directory exists
898     if [[ ! -d "$BACKUP_DIR" ]]; then
899         log_error "Backup directory does not exist: $BACKUP_DIR"
900         log_error "Run --init first to initialize the backup system"
901         exit "$EXIT_BACKUP_ERROR"
902     fi
903
904     # Find latest chain
905     log_step "Finding latest chain..."
906     local latest_chain
907     latest_chain=$(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | sort | tail -1)
908
909     if [[ -z "$latest_chain" ]]; then
910         log_error "No chains found in backup directory: $BACKUP_DIR"
911         log_error "Run --init first to create the initial chain"
912         exit "$EXIT_BACKUP_ERROR"
913     fi
914
915     local chain_id
916     chain_id=$(basename "$latest_chain" | sed 's/^chain-//')
917     log_success "Found latest chain: $chain_id"
918
919     # Read replication slot from chain metadata
920     local metadata_file="$latest_chain/metadata.json"
921     if [[ ! -f "$metadata_file" ]]; then
922         log_error "Chain metadata not found: $metadata_file"
923         log_error "Chain may be corrupted. Create a new chain with:"
924         log_error "  pg_scribe --new-chain -d $DBNAME -f $BACKUP_DIR"
925         exit "$EXIT_BACKUP_ERROR"
926     fi
927
928     SLOT=$(grep '"replication_slot"' "$metadata_file" | cut -d'"' -f4)
929     if [[ -z "$SLOT" ]]; then
930         log_error "Chain metadata missing replication_slot field"
931         log_error "Chain may be corrupted. Create a new chain with:"
932         log_error "  pg_scribe --new-chain -d $DBNAME -f $BACKUP_DIR"
933         exit "$EXIT_BACKUP_ERROR"
934     fi
935
936     log_info "Using replication slot from metadata: $SLOT"
937
938     # Check for existing streaming process
939     local pidfile="$BACKUP_DIR/.pg_scribe.pid"
940     if [[ -f "$pidfile" ]]; then
941         local existing_pid
942         existing_pid=$(cat "$pidfile")
943
944         # Check if process is still running
945         if kill -0 "$existing_pid" 2>/dev/null; then
946             log_error "Already streaming to $BACKUP_DIR (PID $existing_pid)"
947             log_error "Stop the existing process first or use a different backup directory"
948             exit "$EXIT_GENERAL_ERROR"
949         else
950             log_info "Removing stale pidfile (process $existing_pid not running)"
951             rm -f "$pidfile"
952         fi
953     fi
954
955     # Test connection
956     test_connection
957
958     # Verify replication slot exists
959     log_step "Verifying replication slot '$SLOT'..."
960     check_replication_slot "$SLOT" 1
961
962     # Determine output file
963     local output_file="$latest_chain/active.sql"
964
965     # Build pg_recvlogical arguments
966     local pg_recv_args=()
967     mapfile -t pg_recv_args < <(build_pg_recvlogical_args)
968
969     # Add required arguments
970     pg_recv_args+=(--slot="$SLOT")
971     pg_recv_args+=(--start)
972     pg_recv_args+=(--file="$output_file")
973
974     # Add plugin options
975     pg_recv_args+=(--option=include_transaction=on)
976
977     # Add status interval
978     pg_recv_args+=(--status-interval="$STATUS_INTERVAL")
979
980     # Add fsync interval (0 means disabled)
981     if [[ "$FSYNC_INTERVAL" -gt 0 ]]; then
982         pg_recv_args+=(--fsync-interval="$FSYNC_INTERVAL")
983     else
984         # For fsync-interval=0, we skip the parameter to avoid pg_recvlogical errors
985         log_info "Fsync disabled (fsync-interval=0)"
986     fi
987
988     # Display configuration
989     log_step "Configuration"
990     log_info "Database: $DBNAME"
991     log_info "Replication slot: $SLOT"
992     log_info "Chain: $chain_id"
993     log_info "Output file: $output_file"
994     log_info "Status interval: ${STATUS_INTERVAL}s"
995     if [[ "$FSYNC_INTERVAL" -gt 0 ]]; then
996         log_info "Fsync interval: ${FSYNC_INTERVAL}s"
997     else
998         log_info "Fsync: disabled"
999     fi
1000     echo >&2
1001
1002     # Write pidfile before exec (PID stays same after exec)
1003     echo $$ > "$pidfile"
1004
1005     # Start streaming - replace this process with pg_recvlogical
1006     log_step "Starting streaming replication..."
1007     log_info "Press Ctrl+C to stop"
1008     log_info "Send SIGHUP to rotate output file"
1009     echo >&2
1010
1011     # Replace this process with pg_recvlogical
1012     # This eliminates signal forwarding issues and prevents orphaned processes
1013     # The PID stays the same, making cleanup in tests more reliable
1014     exec pg_recvlogical "${pg_recv_args[@]}"
1015 }
1016
1017 #
1018 # --stop command implementation
1019 #
1020 cmd_stop() {
1021     log_step "Stopping active streaming process"
1022
1023     # Validate required arguments
1024     validate_required_args "stop" "BACKUP_DIR:-f/--file (backup directory)"
1025
1026     # Verify backup directory exists
1027     if [[ ! -d "$BACKUP_DIR" ]]; then
1028         log_error "Backup directory does not exist: $BACKUP_DIR"
1029         exit "$EXIT_BACKUP_ERROR"
1030     fi
1031
1032     # Find and validate pidfile
1033     local pidfile="$BACKUP_DIR/.pg_scribe.pid"
1034     if [[ ! -f "$pidfile" ]]; then
1035         log_error "No active streaming process found"
1036         log_error "Pidfile not found: $pidfile"
1037         exit "$EXIT_GENERAL_ERROR"
1038     fi
1039
1040     local pid
1041     pid=$(cat "$pidfile")
1042
1043     # Verify process is running
1044     if ! kill -0 "$pid" 2>/dev/null; then
1045         log_warning "Stale pidfile (process $pid not running)"
1046         log_info "Removing stale pidfile"
1047         rm -f "$pidfile"
1048         exit "$EXIT_SUCCESS"
1049     fi
1050
1051     # Verify process is pg_recvlogical
1052     local proc_name
1053     proc_name=$(ps -p "$pid" -o comm= 2>/dev/null || echo "")
1054     if [[ "$proc_name" != "pg_recvlogical" ]]; then
1055         log_error "PID $pid is not pg_recvlogical (found: $proc_name)"
1056         log_error "Not stopping non-pg_recvlogical process"
1057         exit "$EXIT_GENERAL_ERROR"
1058     fi
1059
1060     log_info "Found pg_recvlogical process (PID $pid)"
1061
1062     # Find active chain
1063     local active_file
1064     active_file=$(find "$BACKUP_DIR"/chain-*/active.sql 2>/dev/null | head -1)
1065
1066     if [[ -n "$active_file" ]]; then
1067         local chain_dir
1068         chain_dir=$(dirname "$active_file")
1069         local chain_id
1070         chain_id=$(basename "$chain_dir" | sed 's/^chain-//')
1071         log_info "Active chain: $chain_id"
1072     fi
1073
1074     # Send SIGTERM to gracefully stop the process
1075     log_info "Sending SIGTERM to process $pid..."
1076     kill -TERM "$pid"
1077
1078     # Wait for process to stop (with timeout)
1079     local timeout=30
1080     local waited=0
1081     log_info "Waiting for process to stop..."
1082
1083     while kill -0 "$pid" 2>/dev/null && [[ $waited -lt $timeout ]]; do
1084         sleep 1
1085         waited=$((waited + 1))
1086     done
1087
1088     # Check if process stopped
1089     if kill -0 "$pid" 2>/dev/null; then
1090         log_warning "Process did not stop gracefully, sending SIGKILL..."
1091         kill -KILL "$pid" 2>/dev/null || true
1092         sleep 1
1093     fi
1094
1095     # Remove pidfile
1096     rm -f "$pidfile"
1097
1098     log_success "Streaming process stopped"
1099     exit "$EXIT_SUCCESS"
1100 }
1101
1102 #
1103 # --rotate-diff command implementation
1104 #
1105 cmd_rotate_diff() {
1106     log_step "Rotating differential file"
1107
1108     # Validate required arguments
1109     validate_required_args "rotate-diff" "BACKUP_DIR:-f/--file (backup directory)"
1110
1111     # Verify backup directory exists
1112     if [[ ! -d "$BACKUP_DIR" ]]; then
1113         log_error "Backup directory does not exist: $BACKUP_DIR"
1114         exit "$EXIT_BACKUP_ERROR"
1115     fi
1116
1117     # Find and validate pidfile
1118     local pidfile="$BACKUP_DIR/.pg_scribe.pid"
1119     if [[ ! -f "$pidfile" ]]; then
1120         log_error "No active streaming process found"
1121         log_error "Pidfile not found: $pidfile"
1122         exit "$EXIT_GENERAL_ERROR"
1123     fi
1124
1125     local pid
1126     pid=$(cat "$pidfile")
1127
1128     # Verify process is running
1129     if ! kill -0 "$pid" 2>/dev/null; then
1130         log_error "Stale pidfile (process $pid not running)"
1131         log_error "Remove $pidfile and start streaming with --start"
1132         exit "$EXIT_GENERAL_ERROR"
1133     fi
1134
1135     # Verify process is pg_recvlogical
1136     local proc_name
1137     proc_name=$(ps -p "$pid" -o comm= 2>/dev/null || echo "")
1138     if [[ "$proc_name" != "pg_recvlogical" ]]; then
1139         log_error "PID $pid is not pg_recvlogical (found: $proc_name)"
1140         exit "$EXIT_GENERAL_ERROR"
1141     fi
1142
1143     log_success "Found active streaming process (PID $pid)"
1144
1145     # Find active.sql file
1146     local active_file
1147     active_file=$(find "$BACKUP_DIR"/chain-*/active.sql 2>/dev/null | head -1)
1148
1149     if [[ -z "$active_file" ]]; then
1150         log_error "No active.sql found in any chain"
1151         exit "$EXIT_BACKUP_ERROR"
1152     fi
1153
1154     local chain_dir
1155     chain_dir=$(dirname "$active_file")
1156     local chain_id
1157     chain_id=$(basename "$chain_dir" | sed 's/^chain-//')
1158
1159     log_success "Found active chain: $chain_id"
1160
1161     # Generate differential timestamp
1162     local diff_timestamp
1163     diff_timestamp=$(get_chain_id)
1164     local sealed_file="$chain_dir/diff-$diff_timestamp.sql"
1165
1166     # Get file size before rotation
1167     local active_size
1168     active_size=$(get_file_size "$active_file")
1169
1170     # Atomic rotation
1171     log_info "Rotating active.sql to diff-$diff_timestamp.sql"
1172
1173     # Rename active â†’ diff (pg_recvlogical still has file open)
1174     if ! mv "$active_file" "$sealed_file"; then
1175         log_error "Failed to rename active.sql"
1176         exit "$EXIT_BACKUP_ERROR"
1177     fi
1178
1179     # Send SIGHUP to trigger file rotation
1180     log_info "Sending SIGHUP to pg_recvlogical..."
1181     kill -HUP "$pid"
1182
1183     # Wait for new active.sql to appear
1184     local timeout=30
1185     local waited=0
1186     log_info "Waiting for new active.sql..."
1187
1188     while [[ $waited -lt $timeout ]]; do
1189         if [[ -f "$chain_dir/active.sql" ]]; then
1190             # Wait a moment to ensure it's being written
1191             sleep 1
1192             if [[ -s "$chain_dir/active.sql" ]] || [[ -f "$chain_dir/active.sql" ]]; then
1193                 log_success "Rotated differential: diff-$diff_timestamp.sql ($active_size)"
1194                 log_success "New differential started"
1195                 exit "$EXIT_SUCCESS"
1196             fi
1197         fi
1198         sleep 1
1199         waited=$((waited + 1))
1200     done
1201
1202     log_error "Timeout waiting for new active.sql"
1203     log_error "The rotation may have failed - check pg_recvlogical process"
1204     exit "$EXIT_GENERAL_ERROR"
1205 }
1206
1207 #
1208 # --new-chain command implementation
1209 #
1210 cmd_new_chain() {
1211     log_step "Creating new chain"
1212
1213     # Phase 1: Validation (no state changes)
1214     validate_required_args "new-chain" "DBNAME:-d/--dbname" "BACKUP_DIR:-f/--file (backup directory)"
1215
1216     test_connection
1217
1218     if [[ ! -d "$BACKUP_DIR" ]]; then
1219         log_error "Backup directory does not exist: $BACKUP_DIR"
1220         log_error "Run --init first to initialize the backup system"
1221         exit "$EXIT_BACKUP_ERROR"
1222     fi
1223
1224     # Validate replication slot exists before creating chain
1225     log_step "Validating replication slot '$SLOT'..."
1226     check_replication_slot "$SLOT" 1
1227
1228     # Validate compression method if specified
1229     if [[ -n "$COMPRESS" && "$COMPRESS" != "none" ]]; then
1230         local compress_type="${COMPRESS%%:*}"
1231         case "$compress_type" in
1232             gzip)
1233                 if ! command -v gzip >/dev/null 2>&1; then
1234                     log_error "Compression method 'gzip' requires gzip command"
1235                     log_error "Install gzip or use a different compression method"
1236                     exit "$EXIT_VALIDATION_ERROR"
1237                 fi
1238                 ;;
1239             lz4)
1240                 if ! command -v lz4 >/dev/null 2>&1; then
1241                     log_error "Compression method 'lz4' requires lz4 command"
1242                     log_error "Install lz4 or use a different compression method"
1243                     exit "$EXIT_VALIDATION_ERROR"
1244                 fi
1245                 ;;
1246             zstd)
1247                 if ! command -v zstd >/dev/null 2>&1; then
1248                     log_error "Compression method 'zstd' requires zstd command"
1249                     log_error "Install zstd or use a different compression method"
1250                     exit "$EXIT_VALIDATION_ERROR"
1251                 fi
1252                 ;;
1253             *)
1254                 log_error "Unknown compression method: $compress_type"
1255                 log_error "Supported methods: gzip, lz4, zstd, none"
1256                 exit "$EXIT_VALIDATION_ERROR"
1257                 ;;
1258         esac
1259     fi
1260
1261     # Phase 2: Create new chain (validation complete)
1262
1263     # Generate new chain ID and create directory
1264     local new_chain_id
1265     new_chain_id=$(get_chain_id)
1266     local new_chain_dir="$BACKUP_DIR/chain-$new_chain_id"
1267
1268     log_info "Creating new chain: $new_chain_id"
1269     if ! mkdir -p "$new_chain_dir"; then
1270         log_error "Failed to create chain directory: $new_chain_dir"
1271         exit "$EXIT_BACKUP_ERROR"
1272     fi
1273
1274     # Take new base backup
1275     log_step "Taking base backup"
1276     local base_backup_file="$new_chain_dir/base.sql"
1277
1278     local psql_args
1279     mapfile -t psql_args < <(build_psql_args)
1280
1281     # Check if compression is requested
1282     local pg_dump_args=("${psql_args[@]}")
1283     if [[ -n "$COMPRESS" ]]; then
1284         log_info "Compression: $COMPRESS"
1285         pg_dump_args+=(--compress="$COMPRESS")
1286
1287         # Add appropriate file extension for compression
1288         local compress_type="${COMPRESS%%:*}"
1289         case "$compress_type" in
1290             gzip)
1291                 base_backup_file="${base_backup_file}.gz"
1292                 ;;
1293             lz4)
1294                 base_backup_file="${base_backup_file}.lz4"
1295                 ;;
1296             zstd)
1297                 base_backup_file="${base_backup_file}.zst"
1298                 ;;
1299             none)
1300                 # No compression, no extension
1301                 ;;
1302             *)
1303                 log_error "Unknown compression method: $compress_type"
1304                 log_error "Supported methods: gzip, lz4, zstd, none"
1305                 exit "$EXIT_VALIDATION_ERROR"
1306                 ;;
1307         esac
1308     fi
1309
1310     pg_dump_args+=(--file="$base_backup_file")
1311
1312     if pg_dump "${pg_dump_args[@]}"; then
1313         local base_size
1314         base_size=$(get_file_size "$base_backup_file")
1315         log_success "Base backup completed ($base_size)"
1316     else
1317         log_error "Base backup failed"
1318         # Clean up partial files
1319         rm -rf "$new_chain_dir" 2>/dev/null || true
1320         exit "$EXIT_BACKUP_ERROR"
1321     fi
1322
1323     # Take globals backup
1324     local globals_backup_file
1325     globals_backup_file=$(take_globals_backup "$new_chain_dir")
1326
1327     # Generate metadata file
1328     log_info "Generating metadata file..."
1329     local metadata_file="$new_chain_dir/metadata.json"
1330     local pg_version
1331     pg_version=$(query_db "SELECT version();")
1332
1333     cat > "$metadata_file" <<EOF
1334 {
1335   "chain_id": "$new_chain_id",
1336   "created": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")",
1337   "pg_scribe_version": "$VERSION",
1338   "database": "$DBNAME",
1339   "replication_slot": "$SLOT",
1340   "postgresql_version": "$pg_version",
1341   "encoding": "$(query_db "SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname = '$DBNAME';")",
1342   "collation": "$(query_db "SELECT datcollate FROM pg_database WHERE datname = '$DBNAME';")"
1343 }
1344 EOF
1345
1346     log_success "Metadata file created"
1347
1348     # Check if streaming is active
1349     local pidfile="$BACKUP_DIR/.pg_scribe.pid"
1350     local streaming_active=0
1351     local old_pid=""
1352
1353     if [[ -f "$pidfile" ]]; then
1354         old_pid=$(cat "$pidfile")
1355         if kill -0 "$old_pid" 2>/dev/null; then
1356             streaming_active=1
1357         fi
1358     fi
1359
1360     # Handle automatic transition if --start was specified
1361     if [[ "$AUTO_START" -eq 1 ]]; then
1362         echo >&2
1363         log_step "Automatic Transition (--start specified)"
1364
1365         # Stop old streaming process if active
1366         if [[ $streaming_active -eq 1 ]]; then
1367             # Identify which chain the old process was streaming to
1368             local old_chain_file=""
1369             local old_chain_dir=""
1370             local old_chain_id=""
1371
1372             local pg_recv_cmdline
1373             pg_recv_cmdline=$(ps -p "$old_pid" -o args= 2>/dev/null || echo "")
1374
1375             if [[ -n "$pg_recv_cmdline" ]]; then
1376                 # Extract --file=... from command line
1377                 old_chain_file=$(echo "$pg_recv_cmdline" | grep -oP '\-\-file=\K[^ ]+' || echo "")
1378
1379                 if [[ -n "$old_chain_file" ]] && [[ -f "$old_chain_file" ]]; then
1380                     old_chain_dir=$(dirname "$old_chain_file")
1381                     old_chain_id=$(basename "$old_chain_dir" | sed 's/^chain-//')
1382                     log_info "Old chain was streaming to: $old_chain_id"
1383                 fi
1384             fi
1385
1386             log_info "Stopping old streaming process (PID $old_pid)..."
1387             kill -TERM "$old_pid"
1388
1389             # Wait for process to stop (with timeout)
1390             local timeout=30
1391             local waited=0
1392             log_info "Waiting for process to stop..."
1393
1394             while kill -0 "$old_pid" 2>/dev/null && [[ $waited -lt $timeout ]]; do
1395                 sleep 1
1396                 waited=$((waited + 1))
1397             done
1398
1399             # Check if process stopped
1400             if kill -0 "$old_pid" 2>/dev/null; then
1401                 log_warning "Process did not stop gracefully, sending SIGKILL..."
1402                 kill -KILL "$old_pid" 2>/dev/null || true
1403                 sleep 1
1404             fi
1405
1406             log_success "Old streaming process stopped"
1407
1408             # Seal the old chain's active.sql if it exists
1409             if [[ -n "$old_chain_dir" ]] && [[ -f "$old_chain_dir/active.sql" ]]; then
1410                 log_step "Sealing old chain's active.sql"
1411
1412                 local diff_timestamp
1413                 diff_timestamp=$(get_chain_id)
1414                 local sealed_file="$old_chain_dir/diff-$diff_timestamp.sql"
1415
1416                 log_info "Sealing active.sql to diff-$diff_timestamp.sql"
1417                 if mv "$old_chain_dir/active.sql" "$sealed_file"; then
1418                     local sealed_size
1419                     sealed_size=$(get_file_size "$sealed_file")
1420                     log_success "Sealed old chain's differential: diff-$diff_timestamp.sql ($sealed_size)"
1421                 else
1422                     log_warning "Failed to seal old chain's active.sql"
1423                 fi
1424             fi
1425         fi
1426
1427         # Now start streaming to new chain
1428         log_step "Starting streaming to new chain: $new_chain_id"
1429
1430         # Find latest chain (should be the one we just created)
1431         local latest_chain
1432         latest_chain=$(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | sort | tail -1)
1433
1434         # Determine output file
1435         local output_file="$latest_chain/active.sql"
1436
1437         # Build pg_recvlogical arguments
1438         local pg_recv_args=()
1439         mapfile -t pg_recv_args < <(build_pg_recvlogical_args)
1440
1441         # Add required arguments
1442         pg_recv_args+=(--slot="$SLOT")
1443         pg_recv_args+=(--start)
1444         pg_recv_args+=(--file="$output_file")
1445
1446         # Add plugin options
1447         pg_recv_args+=(--option=include_transaction=on)
1448
1449         # Add status interval
1450         pg_recv_args+=(--status-interval="$STATUS_INTERVAL")
1451
1452         # Add fsync interval (0 means disabled)
1453         if [[ "$FSYNC_INTERVAL" -gt 0 ]]; then
1454             pg_recv_args+=(--fsync-interval="$FSYNC_INTERVAL")
1455         fi
1456
1457         # Write pidfile before exec (PID stays same after exec)
1458         local pidfile="$BACKUP_DIR/.pg_scribe.pid"
1459         echo $$ > "$pidfile"
1460
1461         log_info "Output file: $output_file"
1462         log_info "Press Ctrl+C to stop"
1463         echo >&2
1464
1465         # Replace this process with pg_recvlogical
1466         exec pg_recvlogical "${pg_recv_args[@]}"
1467     fi
1468
1469     # Final summary (only reached if --start was NOT specified)
1470     echo >&2
1471     log_step "New Chain Complete"
1472     log_success "Chain created: $new_chain_id"
1473     log_success "Location: $new_chain_dir"
1474     log_success "Base backup: $(basename "$base_backup_file")"
1475
1476     if [[ $streaming_active -eq 1 ]]; then
1477         echo >&2
1478         log_info "Active streaming process detected (PID $old_pid)"
1479         log_info ""
1480         log_info "To transition to the new chain:"
1481         log_info "  1. Stop the current streaming process:"
1482         log_info "     kill -TERM $old_pid"
1483         log_info "     # Or: pg_scribe --stop -f $BACKUP_DIR"
1484         log_info "  2. Start streaming to the new chain:"
1485         log_info "     pg_scribe --start -d $DBNAME -f $BACKUP_DIR"
1486         log_info ""
1487         log_info "Or use --rotate-diff to seal a differential before transitioning"
1488     else
1489         echo >&2
1490         log_info "No active streaming process detected"
1491         log_info ""
1492         log_info "Start streaming to the new chain:"
1493         log_info "  pg_scribe --start -d $DBNAME -f $BACKUP_DIR"
1494     fi
1495
1496     exit "$EXIT_SUCCESS"
1497 }
1498
1499 #
1500 # --restore command implementation
1501 #
1502 cmd_restore() {
1503     log_step "Restoring database from backup"
1504
1505     # Validate required arguments
1506     validate_required_args "restore" "DBNAME:-d/--dbname (target database)" "BACKUP_DIR:-f/--file (backup directory)"
1507
1508     # Verify backup directory exists
1509     if [[ ! -d "$BACKUP_DIR" ]]; then
1510         log_error "Backup directory does not exist: $BACKUP_DIR"
1511         exit "$EXIT_BACKUP_ERROR"
1512     fi
1513
1514     # Determine target chain
1515     log_step "Locating chain"
1516     local chain_dir=""
1517     local chain_id=""
1518
1519     if [[ -n "$BASE_BACKUP" ]]; then
1520         # BASE_BACKUP can be a chain ID or a specific chain directory
1521         if [[ -d "$BACKUP_DIR/chain-$BASE_BACKUP" ]]; then
1522             chain_dir="$BACKUP_DIR/chain-$BASE_BACKUP"
1523             chain_id="$BASE_BACKUP"
1524             log_info "Using specified chain: $chain_id"
1525         elif [[ -d "$BASE_BACKUP" ]] && [[ "$(basename "$BASE_BACKUP")" =~ ^chain- ]]; then
1526             chain_dir="$BASE_BACKUP"
1527             chain_id=$(basename "$chain_dir" | sed 's/^chain-//')
1528             log_info "Using specified chain directory: $chain_id"
1529         else
1530             log_error "Chain not found: $BASE_BACKUP"
1531             exit "$EXIT_BACKUP_ERROR"
1532         fi
1533     else
1534         # Use latest chain
1535         chain_dir=$(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | sort | tail -1)
1536
1537         if [[ -z "$chain_dir" ]]; then
1538             log_error "No chains found in backup directory: $BACKUP_DIR"
1539             log_error "Run --init first to create a chain"
1540             exit "$EXIT_BACKUP_ERROR"
1541         fi
1542
1543         chain_id=$(basename "$chain_dir" | sed 's/^chain-//')
1544         log_success "Found latest chain: $chain_id"
1545     fi
1546
1547     # Verify chain structure
1548     local base_backup_path="$chain_dir/base.sql"
1549     local globals_backup_path="$chain_dir/globals.sql"
1550
1551     # Check for compressed base backup
1552     if [[ ! -f "$base_backup_path" ]]; then
1553         # Try compressed variants
1554         if [[ -f "$base_backup_path.gz" ]]; then
1555             base_backup_path="$base_backup_path.gz"
1556         elif [[ -f "$base_backup_path.lz4" ]]; then
1557             base_backup_path="$base_backup_path.lz4"
1558         elif [[ -f "$base_backup_path.zst" ]]; then
1559             base_backup_path="$base_backup_path.zst"
1560         else
1561             log_error "Base backup not found in chain: $chain_id"
1562             exit "$EXIT_BACKUP_ERROR"
1563         fi
1564     fi
1565
1566     log_info "Base backup: $(basename "$base_backup_path")"
1567
1568     # Find all sealed differentials (sorted by timestamp)
1569     local differential_files=()
1570     mapfile -t differential_files < <(find "$chain_dir" -maxdepth 1 -name 'diff-*.sql' 2>/dev/null | sort)
1571
1572     if [[ ${#differential_files[@]} -gt 0 ]]; then
1573         log_info "Found ${#differential_files[@]} sealed differential(s)"
1574     else
1575         log_info "No sealed differentials found (will restore base backup only)"
1576     fi
1577
1578     # Create target database if requested
1579     if [[ "$CREATE_DB" -eq 1 ]]; then
1580         log_step "Creating target database"
1581
1582         # Connect to postgres database (not target database) to create it
1583         local create_dbname="$DBNAME"
1584         DBNAME="postgres"
1585
1586         # Test connection to postgres database
1587         test_connection
1588
1589         # Check if database already exists
1590         local db_exists
1591         db_exists=$(query_db "SELECT count(*) FROM pg_database WHERE datname = '$create_dbname';")
1592
1593         if [[ "$db_exists" -gt 0 ]]; then
1594             log_error "Database '$create_dbname' already exists"
1595             log_error "Drop it first or omit --create flag to restore into existing database"
1596             exit "$EXIT_BACKUP_ERROR"
1597         fi
1598
1599         # Create database
1600         if query_db_silent "CREATE DATABASE \"$create_dbname\";"; then
1601             log_success "Created database: $create_dbname"
1602         else
1603             log_error "Failed to create database: $create_dbname"
1604             exit "$EXIT_BACKUP_ERROR"
1605         fi
1606
1607         # Switch back to target database for subsequent operations
1608         DBNAME="$create_dbname"
1609     fi
1610
1611     # Test connection to target database
1612     test_connection
1613
1614     # Restore globals backup
1615     if [[ -f "$globals_backup_path" ]]; then
1616         log_step "Restoring globals (roles, tablespaces)"
1617
1618         # Build connection args for psql
1619         # Note: globals must be restored to postgres database, not target database
1620         local save_dbname="$DBNAME"
1621         DBNAME="postgres"
1622         local psql_args
1623         mapfile -t psql_args < <(build_psql_args)
1624         DBNAME="$save_dbname"
1625
1626         if psql "${psql_args[@]}" -f "$globals_backup_path" >/dev/null 2>&1; then
1627             log_success "Globals restored successfully"
1628         else
1629             log_warning "Globals restore had errors (may be expected if roles already exist)"
1630         fi
1631     else
1632         log_warning "No globals backup found in chain (roles and tablespaces will not be restored)"
1633     fi
1634
1635     # Restore base backup
1636     log_step "Restoring base backup"
1637     local start_time
1638     start_time=$(date +%s)
1639
1640     local psql_args
1641     mapfile -t psql_args < <(build_psql_args)
1642
1643     # Handle compressed backups
1644     if [[ "$base_backup_path" == *.gz ]]; then
1645         log_info "Decompressing gzip backup..."
1646         if gunzip -c "$base_backup_path" | psql "${psql_args[@]}" >/dev/null 2>&1; then
1647             log_success "Base backup restored successfully"
1648         else
1649             log_error "Base backup restore failed"
1650             exit "$EXIT_BACKUP_ERROR"
1651         fi
1652     elif [[ "$base_backup_path" == *.zst ]]; then
1653         log_info "Decompressing zstd backup..."
1654         if zstd -dc "$base_backup_path" | psql "${psql_args[@]}" >/dev/null 2>&1; then
1655             log_success "Base backup restored successfully"
1656         else
1657             log_error "Base backup restore failed"
1658             exit "$EXIT_BACKUP_ERROR"
1659         fi
1660     elif [[ "$base_backup_path" == *.lz4 ]]; then
1661         log_info "Decompressing lz4 backup..."
1662         if lz4 -dc "$base_backup_path" | psql "${psql_args[@]}" >/dev/null 2>&1; then
1663             log_success "Base backup restored successfully"
1664         else
1665             log_error "Base backup restore failed"
1666             exit "$EXIT_BACKUP_ERROR"
1667         fi
1668     else
1669         # Uncompressed backup
1670         if psql "${psql_args[@]}" -f "$base_backup_path" >/dev/null 2>&1; then
1671             log_success "Base backup restored successfully"
1672         else
1673             log_error "Base backup restore failed"
1674             exit "$EXIT_BACKUP_ERROR"
1675         fi
1676     fi
1677
1678     # Apply sealed differentials
1679     if [[ ${#differential_files[@]} -gt 0 ]]; then
1680         log_step "Applying sealed differentials"
1681
1682         local diff_count=0
1683         for diff_file in "${differential_files[@]}"; do
1684             log_info "Applying: $(basename "$diff_file")"
1685
1686             # Optimize for bulk restore: disable synchronous_commit to avoid fsync() on every COMMIT
1687             # This is safe during restore - we can always re-run if it fails
1688             if psql "${psql_args[@]}" -c "SET synchronous_commit = off;" -f "$diff_file" >/dev/null 2>&1; then
1689                 if [[ "$VERBOSE" -eq 1 ]]; then
1690                     log_success "Applied: $(basename "$diff_file")"
1691                 fi
1692                 diff_count=$((diff_count + 1))
1693             else
1694                 log_error "Failed to apply differential: $(basename "$diff_file")"
1695                 log_error "Restore is incomplete"
1696                 exit "$EXIT_BACKUP_ERROR"
1697             fi
1698         done
1699
1700         log_success "Applied $diff_count sealed differential(s)"
1701     fi
1702
1703     # Apply active.sql if requested (WARNING: may be incomplete)
1704     if [[ "$INCLUDE_ACTIVE" -eq 1 ]] && [[ -f "$chain_dir/active.sql" ]]; then
1705         log_step "Applying active.sql (INCOMPLETE DATA WARNING)"
1706         log_warning "active.sql may contain incomplete transactions!"
1707
1708         local psql_args
1709         mapfile -t psql_args < <(build_psql_args)
1710
1711         if psql "${psql_args[@]}" -f "$chain_dir/active.sql" >/dev/null 2>&1; then
1712             log_warning "Applied incomplete active.sql - verify data integrity!"
1713         else
1714             log_error "Failed to apply active.sql"
1715             exit "$EXIT_BACKUP_ERROR"
1716         fi
1717     elif [[ "$INCLUDE_ACTIVE" -eq 1 ]]; then
1718         log_warning "No active.sql found in chain (--include-active was specified)"
1719     fi
1720
1721     # Synchronize sequences
1722     if [[ "$NO_SYNC_SEQUENCES" -eq 0 ]]; then
1723         log_step "Synchronizing sequences"
1724
1725         # Query all sequences and their associated tables
1726         local seq_sync_sql
1727         seq_sync_sql=$(query_db "
1728             SELECT
1729                 'SELECT setval(' ||
1730                 quote_literal(sn.nspname || '.' || s.relname) ||
1731                 ', GREATEST((SELECT COALESCE(MAX(' ||
1732                 quote_ident(a.attname) ||
1733                 '), 1) FROM ' ||
1734                 quote_ident(tn.nspname) || '.' || quote_ident(t.relname) ||
1735                 '), 1));'
1736             FROM pg_class s
1737             JOIN pg_namespace sn ON sn.oid = s.relnamespace
1738             JOIN pg_depend d ON d.objid = s.oid AND d.deptype = 'a'
1739             JOIN pg_class t ON t.oid = d.refobjid
1740             JOIN pg_namespace tn ON tn.oid = t.relnamespace
1741             JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = d.refobjsubid
1742             WHERE s.relkind = 'S'
1743               AND sn.nspname NOT IN ('pg_catalog', 'information_schema')
1744             ORDER BY sn.nspname, s.relname;
1745         " 2>/dev/null)
1746
1747         if [[ -n "$seq_sync_sql" ]]; then
1748             local seq_count=0
1749             while IFS= read -r sync_cmd; do
1750                 if query_db_silent "$sync_cmd"; then
1751                     seq_count=$((seq_count + 1))
1752                     if [[ "$VERBOSE" -eq 1 ]]; then
1753                         log_info "Synced sequence: $(echo "$sync_cmd" | grep -oP "'\K[^']+(?=')")"
1754                     fi
1755                 else
1756                     log_warning "Failed to sync sequence: $sync_cmd"
1757                 fi
1758             done <<< "$seq_sync_sql"
1759
1760             log_success "Synchronized $seq_count sequence(s)"
1761         else
1762             log_info "No sequences found to synchronize"
1763         fi
1764     else
1765         log_info "Skipping sequence synchronization (--no-sync-sequences specified)"
1766     fi
1767
1768     # Calculate restore duration
1769     local end_time
1770     end_time=$(date +%s)
1771     local duration=$((end_time - start_time))
1772
1773     # Report statistics
1774     log_step "Restore Statistics"
1775
1776     # Count rows in all tables
1777     log_info "Counting rows in restored tables..."
1778     local table_count
1779     table_count=$(query_db "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 'information_schema');" 2>/dev/null)
1780
1781     local total_rows
1782     total_rows=$(query_db "
1783         SELECT COALESCE(SUM(n_live_tup), 0)
1784         FROM pg_stat_user_tables;
1785     " 2>/dev/null)
1786
1787     echo -e "${BOLD}Database:${RESET}            $DBNAME" >&2
1788     echo -e "${BOLD}Chain:${RESET}               $chain_id" >&2
1789     echo -e "${BOLD}Tables Restored:${RESET}     $table_count" >&2
1790     echo -e "${BOLD}Total Rows:${RESET}          $total_rows (approximate)" >&2
1791     echo -e "${BOLD}Duration:${RESET}            ${duration}s" >&2
1792     echo -e "${BOLD}Base Backup:${RESET}         $(basename "$base_backup_path")" >&2
1793
1794     if [[ ${#differential_files[@]} -gt 0 ]]; then
1795         echo -e "${BOLD}Differentials Applied:${RESET}  ${#differential_files[@]}" >&2
1796     fi
1797
1798     if [[ "$INCLUDE_ACTIVE" -eq 1 ]] && [[ -f "$chain_dir/active.sql" ]]; then
1799         echo -e "${BOLD}Included active.sql:${RESET} ${YELLOW}YES (incomplete data)${RESET}" >&2
1800     fi
1801
1802     # Final success message
1803     echo >&2
1804     log_step "Restore Complete"
1805     log_success "Database successfully restored to: $DBNAME"
1806     log_info "Next steps:"
1807     log_info "  1. Verify data integrity:"
1808     log_info "     psql -d $DBNAME -c 'SELECT COUNT(*) FROM <your_table>;'"
1809     log_info "  2. Run application smoke tests"
1810     log_info "  3. Switch application to restored database"
1811
1812     exit "$EXIT_SUCCESS"
1813 }
1814
1815 #
1816 # --status command implementation
1817 #
1818 cmd_status() {
1819     log_step "Checking pg_scribe backup system status"
1820
1821     # Validate required arguments
1822     validate_required_args "status" "DBNAME:-d/--dbname"
1823
1824     # Test connection
1825     test_connection
1826
1827     # Track warnings for exit code
1828     local has_warnings=0
1829
1830     # Check replication slot status
1831     log_step "Replication Slot Status"
1832
1833     # Verify replication slot exists
1834     check_replication_slot "$SLOT" 1
1835
1836     # Query slot details
1837     local slot_info
1838     slot_info=$(query_db "
1839         SELECT
1840             slot_name,
1841             slot_type,
1842             database,
1843             active,
1844             restart_lsn,
1845             confirmed_flush_lsn,
1846             pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as restart_lag_bytes,
1847             pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as confirmed_lag_bytes,
1848             pg_current_wal_lsn() as current_lsn
1849         FROM pg_replication_slots
1850         WHERE slot_name = '$SLOT';
1851     " | head -1)
1852
1853     # Parse slot info
1854     IFS='|' read -r slot_name slot_type db_name active restart_lsn confirmed_flush_lsn restart_lag_bytes confirmed_lag_bytes current_lsn <<< "$slot_info"
1855
1856     # Display slot information
1857     echo -e "${BOLD}Slot Name:${RESET}       $slot_name" >&2
1858     echo -e "${BOLD}Slot Type:${RESET}       $slot_type" >&2
1859     echo -e "${BOLD}Database:${RESET}        $db_name" >&2
1860
1861     if [[ "$active" == "t" ]]; then
1862         echo -e "${BOLD}Active:${RESET}          ${GREEN}Yes${RESET}" >&2
1863     else
1864         echo -e "${BOLD}Active:${RESET}          ${YELLOW}No${RESET}" >&2
1865         log_warning "Replication slot is not active"
1866         has_warnings=1
1867     fi
1868
1869     echo -e "${BOLD}Current WAL LSN:${RESET} $current_lsn" >&2
1870     echo -e "${BOLD}Restart LSN:${RESET}     $restart_lsn" >&2
1871     echo -e "${BOLD}Confirmed LSN:${RESET}   $confirmed_flush_lsn" >&2
1872
1873     # Format lag in human-readable sizes
1874     local restart_lag_mb=$((restart_lag_bytes / 1024 / 1024))
1875     local confirmed_lag_mb=$((confirmed_lag_bytes / 1024 / 1024))
1876
1877     # Check lag thresholds (based on design doc)
1878     if [[ "$restart_lag_bytes" -gt 10737418240 ]]; then
1879         # > 10GB - CRITICAL
1880         echo -e "${BOLD}Restart Lag:${RESET}     ${RED}${restart_lag_mb} MB (CRITICAL!)${RESET}" >&2
1881         log_error "CRITICAL: Replication lag exceeds 10GB!"
1882         log_error "  This may cause disk space issues or database shutdown"
1883         log_error "  Consider dropping the slot if backup collection has stopped"
1884         has_warnings=1
1885     elif [[ "$restart_lag_bytes" -gt 1073741824 ]]; then
1886         # > 1GB - WARNING
1887         echo -e "${BOLD}Restart Lag:${RESET}     ${YELLOW}${restart_lag_mb} MB (WARNING)${RESET}" >&2
1888         log_warning "Replication lag exceeds 1GB"
1889         log_warning "  Ensure backup collection is running and healthy"
1890         has_warnings=1
1891     else
1892         echo -e "${BOLD}Restart Lag:${RESET}     ${GREEN}${restart_lag_mb} MB${RESET}" >&2
1893     fi
1894
1895     if [[ "$confirmed_lag_bytes" -gt 10737418240 ]]; then
1896         echo -e "${BOLD}Confirmed Lag:${RESET}   ${RED}${confirmed_lag_mb} MB (CRITICAL!)${RESET}" >&2
1897         has_warnings=1
1898     elif [[ "$confirmed_lag_bytes" -gt 1073741824 ]]; then
1899         echo -e "${BOLD}Confirmed Lag:${RESET}   ${YELLOW}${confirmed_lag_mb} MB (WARNING)${RESET}" >&2
1900         has_warnings=1
1901     else
1902         echo -e "${BOLD}Confirmed Lag:${RESET}   ${GREEN}${confirmed_lag_mb} MB${RESET}" >&2
1903     fi
1904
1905     # Check slot age (if we can determine it)
1906     # Note: pg_replication_slots doesn't directly track creation time, but we can estimate from WAL
1907     echo >&2
1908
1909     # Analyze backup directory if provided
1910     if [[ -n "$BACKUP_DIR" ]]; then
1911         log_step "Chain Inventory"
1912
1913         if [[ ! -d "$BACKUP_DIR" ]]; then
1914             log_warning "Backup directory does not exist: $BACKUP_DIR"
1915             has_warnings=1
1916         else
1917             # Find all chains
1918             local chains
1919             mapfile -t chains < <(find "$BACKUP_DIR" -maxdepth 1 -type d -name 'chain-*' 2>/dev/null | sort)
1920
1921             if [[ ${#chains[@]} -eq 0 ]]; then
1922                 log_warning "No chains found in backup directory"
1923                 log_warning "  Run --init to create the initial chain"
1924                 has_warnings=1
1925             else
1926                 echo -e "${BOLD}Backup Directory:${RESET} $BACKUP_DIR" >&2
1927                 echo "" >&2
1928
1929                 # Determine which chain is active
1930                 local pidfile="$BACKUP_DIR/.pg_scribe.pid"
1931                 local active_chain_id=""
1932                 local active_pid=""
1933
1934                 if [[ -f "$pidfile" ]]; then
1935                     local pid
1936                     pid=$(cat "$pidfile")
1937                     if kill -0 "$pid" 2>/dev/null; then
1938                         # Get which file the process is actually writing to from its command line
1939                         local pg_recv_cmdline
1940                         pg_recv_cmdline=$(ps -p "$pid" -o args= 2>/dev/null)
1941
1942                         # Extract --file=... from command line
1943                         local active_file
1944                         active_file=$(echo "$pg_recv_cmdline" | grep -oP '\-\-file=\K[^ ]+' || echo "")
1945
1946                         if [[ -n "$active_file" ]] && [[ -f "$active_file" ]]; then
1947                             active_chain_id=$(basename "$(dirname "$active_file")" | sed 's/^chain-//')
1948                             active_pid="$pid"
1949                         fi
1950                     fi
1951                 fi
1952
1953                 # Display each chain
1954                 for chain_dir in "${chains[@]}"; do
1955                     local chain_id
1956                     chain_id=$(basename "$chain_dir" | sed 's/^chain-//')
1957
1958                     # Gather chain info
1959                     local base_backup="$chain_dir/base.sql"
1960                     local base_size=""
1961
1962                     # Check for compressed variants
1963                     if [[ -f "$base_backup" ]]; then
1964                         base_size=$(get_file_size "$base_backup")
1965                     elif [[ -f "$base_backup.gz" ]]; then
1966                         base_size=$(get_file_size "$base_backup.gz")
1967                     elif [[ -f "$base_backup.lz4" ]]; then
1968                         base_size=$(get_file_size "$base_backup.lz4")
1969                     elif [[ -f "$base_backup.zst" ]]; then
1970                         base_size=$(get_file_size "$base_backup.zst")
1971                     fi
1972
1973                     local diff_count
1974                     diff_count=$(find "$chain_dir" -maxdepth 1 -name 'diff-*.sql' 2>/dev/null | wc -l)
1975
1976                     local total_size
1977                     total_size=$(du -sh "$chain_dir" 2>/dev/null | cut -f1)
1978
1979                     # Check if this chain is active
1980                     if [[ "$chain_id" == "$active_chain_id" ]]; then
1981                         echo -e "  ${GREEN}chain-$chain_id${RESET} ${BOLD}(ACTIVE - streaming)${RESET}" >&2
1982                         echo -e "    ${BOLD}PID:${RESET}            $active_pid" >&2
1983                     else
1984                         echo -e "  chain-$chain_id" >&2
1985                     fi
1986
1987                     echo -e "    ${BOLD}Base backup:${RESET}    $base_size" >&2
1988                     echo -e "    ${BOLD}Differentials:${RESET}  $diff_count sealed" >&2
1989                     echo -e "    ${BOLD}Total size:${RESET}     $total_size" >&2
1990
1991                     # Show last activity if active.sql exists
1992                     if [[ -f "$chain_dir/active.sql" ]]; then
1993                         local last_mod
1994                         last_mod=$(stat -c %y "$chain_dir/active.sql" 2>/dev/null | cut -d. -f1)
1995                         local age_seconds
1996                         age_seconds=$(( $(date +%s) - $(stat -c %Y "$chain_dir/active.sql" 2>/dev/null) ))
1997                         local age_minutes=$((age_seconds / 60))
1998
1999                         echo -e "    ${BOLD}Last activity:${RESET}  $last_mod ($age_minutes minutes ago)" >&2
2000
2001                         # Warn if last activity is old (only for active chain)
2002                         if [[ "$chain_id" == "$active_chain_id" ]] && [[ "$age_minutes" -gt 60 ]]; then
2003                             log_warning "Active chain has no activity for ${age_minutes} minutes"
2004                             log_warning "  Verify that streaming is working correctly"
2005                             has_warnings=1
2006                         fi
2007                     fi
2008
2009                     echo "" >&2
2010                 done
2011
2012                 # Calculate total backup directory size
2013                 local total_size
2014                 total_size=$(du -sh "$BACKUP_DIR" 2>/dev/null | cut -f1)
2015                 echo -e "${BOLD}Total Backup Size:${RESET} $total_size" >&2
2016             fi
2017         fi
2018     fi
2019
2020     # Overall health summary
2021     echo >&2
2022     log_step "Health Summary"
2023
2024     if [[ "$has_warnings" -eq 0 ]]; then
2025         log_success "System is healthy"
2026         echo >&2
2027         log_info "Replication slot is active and lag is acceptable"
2028         if [[ -n "$BACKUP_DIR" ]]; then
2029             log_info "Backup directory appears healthy"
2030         fi
2031         exit "$EXIT_SUCCESS"
2032     else
2033         log_warning "System has warnings - review messages above"
2034         echo >&2
2035         log_info "Address any CRITICAL or WARNING issues promptly"
2036         log_info "See design doc for monitoring recommendations"
2037         exit "$EXIT_WARNING"
2038     fi
2039 }
2040
2041 # Main entry point
2042 main() {
2043     parse_args "$@"
2044
2045     case "$ACTION" in
2046         init)
2047             cmd_init
2048             ;;
2049         start)
2050             cmd_start
2051             ;;
2052         stop)
2053             cmd_stop
2054             ;;
2055         rotate-diff)
2056             cmd_rotate_diff
2057             ;;
2058         new-chain)
2059             cmd_new_chain
2060             ;;
2061         restore)
2062             cmd_restore
2063             ;;
2064         status)
2065             cmd_status
2066             ;;
2067         *)
2068             log_error "Unknown action: $ACTION"
2069             exit "$EXIT_GENERAL_ERROR"
2070             ;;
2071     esac
2072 }
2073
2074 # Run main with all arguments
2075 main "$@"