]> begriffs open source - pg_scribe/blob - wal2sql/wal2sql.c
Seal diffs at chain trainsfer
[pg_scribe] / wal2sql / wal2sql.c
1 /*-------------------------------------------------------------------------
2  *
3  * wal2sql.c
4  *              Logical decoding output plugin generating SQL queries based
5  *              on things decoded.
6  *
7  * Forked from decoder_raw by Michael Paquier
8  * https://github.com/michaelpq/pg_plugins/tree/main/decoder_raw
9  *
10  * Copyright (c) 1996-2025, PostgreSQL Global Development Group
11  *
12  * IDENTIFICATION
13  *                wal2sql/wal2sql.c
14  *
15  *-------------------------------------------------------------------------
16  */
17
18 #include "postgres.h"
19
20 #include "access/genam.h"
21 #include "access/sysattr.h"
22 #include "catalog/pg_class.h"
23 #include "catalog/pg_type.h"
24 #include "nodes/parsenodes.h"
25 #include "replication/output_plugin.h"
26 #include "replication/logical.h"
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
34
35
36 PG_MODULE_MAGIC;
37
38 /* These must be available to pg_dlsym() */
39 extern void _PG_init(void);
40 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
41
42 /*
43  * Structure storing the plugin specifications and options.
44  */
45 typedef struct
46 {
47         MemoryContext context;
48         bool            include_transaction;
49 }                       Wal2SqlData;
50
51 static void wal2sql_startup(LogicalDecodingContext *ctx,
52                                                         OutputPluginOptions *opt,
53                                                         bool is_init);
54 static void wal2sql_shutdown(LogicalDecodingContext *ctx);
55 static void wal2sql_begin_txn(LogicalDecodingContext *ctx,
56                                                           ReorderBufferTXN *txn);
57 static void wal2sql_commit_txn(LogicalDecodingContext *ctx,
58                                                            ReorderBufferTXN *txn,
59                                                            XLogRecPtr commit_lsn);
60 static void wal2sql_change(LogicalDecodingContext *ctx,
61                                                    ReorderBufferTXN *txn, Relation rel,
62                                                    ReorderBufferChange *change);
63 static void wal2sql_message(LogicalDecodingContext *ctx,
64                                                         ReorderBufferTXN *txn,
65                                                         XLogRecPtr message_lsn,
66                                                         bool transactional,
67                                                         const char *prefix,
68                                                         Size message_size,
69                                                         const char *message);
70 static void wal2sql_truncate(LogicalDecodingContext *ctx,
71                                                          ReorderBufferTXN *txn,
72                                                          int nrelations,
73                                                          Relation relations[],
74                                                          ReorderBufferChange *change);
75
76 void
77 _PG_init(void)
78 {
79         /* other plugins can perform things here */
80 }
81
82 /* specify output plugin callbacks */
83 void
84 _PG_output_plugin_init(OutputPluginCallbacks *cb)
85 {
86         AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
87
88         cb->startup_cb = wal2sql_startup;
89         cb->begin_cb = wal2sql_begin_txn;
90         cb->change_cb = wal2sql_change;
91         cb->commit_cb = wal2sql_commit_txn;
92         cb->message_cb = wal2sql_message;
93         cb->truncate_cb = wal2sql_truncate;
94         cb->shutdown_cb = wal2sql_shutdown;
95 }
96
97
98 /* initialize this plugin */
99 static void
100 wal2sql_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
101                                 bool is_init)
102 {
103         ListCell   *option;
104         Wal2SqlData *data;
105
106         data = palloc(sizeof(Wal2SqlData));
107         data->context = AllocSetContextCreate(ctx->context,
108                                                                                   "wal2sql context",
109                                                                                   ALLOCSET_DEFAULT_SIZES);
110         data->include_transaction = false;
111
112         ctx->output_plugin_private = data;
113
114         /* Default output format */
115         opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
116
117         foreach(option, ctx->output_plugin_options)
118         {
119                 DefElem    *elem = lfirst(option);
120
121                 Assert(elem->arg == NULL || IsA(elem->arg, String));
122
123                 if (strcmp(elem->defname, "include_transaction") == 0)
124                 {
125                         /* if option does not provide a value, it means its value is true */
126                         if (elem->arg == NULL)
127                                 data->include_transaction = true;
128                         else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
129                                 ereport(ERROR,
130                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
131                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
132                                                                 strVal(elem->arg), elem->defname)));
133                 }
134                 else if (strcmp(elem->defname, "output_format") == 0)
135                 {
136                         char       *format = NULL;
137
138                         if (elem->arg == NULL)
139                                 ereport(ERROR,
140                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
141                                                  errmsg("No value specified for parameter \"%s\"",
142                                                                 elem->defname)));
143
144                         format = strVal(elem->arg);
145
146                         if (strcmp(format, "textual") == 0)
147                                 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
148                         else if (strcmp(format, "binary") == 0)
149                                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
150                         else
151                                 ereport(ERROR,
152                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
153                                                  errmsg("Incorrect value \"%s\" for parameter \"%s\"",
154                                                                 format, elem->defname)));
155                 }
156                 else
157                 {
158                         ereport(ERROR,
159                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
160                                          errmsg("option \"%s\" = \"%s\" is unknown",
161                                                         elem->defname,
162                                                         elem->arg ? strVal(elem->arg) : "(null)")));
163                 }
164         }
165 }
166
167 /* cleanup this plugin's resources */
168 static void
169 wal2sql_shutdown(LogicalDecodingContext *ctx)
170 {
171         Wal2SqlData *data = ctx->output_plugin_private;
172
173         /* cleanup our own resources via memory context reset */
174         MemoryContextDelete(data->context);
175 }
176
177 /* BEGIN callback */
178 static void
179 wal2sql_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
180 {
181         Wal2SqlData *data = ctx->output_plugin_private;
182
183         /* Write to the plugin only if there is */
184         if (data->include_transaction)
185         {
186                 OutputPluginPrepareWrite(ctx, true);
187                 appendStringInfoString(ctx->out, "BEGIN;");
188                 OutputPluginWrite(ctx, true);
189         }
190 }
191
192 /* COMMIT callback */
193 static void
194 wal2sql_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
195                                    XLogRecPtr commit_lsn)
196 {
197         Wal2SqlData *data = ctx->output_plugin_private;
198
199         /* Write to the plugin only if there is */
200         if (data->include_transaction)
201         {
202                 OutputPluginPrepareWrite(ctx, true);
203                 appendStringInfoString(ctx->out, "COMMIT;");
204                 OutputPluginWrite(ctx, true);
205         }
206 }
207
208 /*
209  * Print literal `outputstr' already represented as string of type `typid'
210  * into stringbuf `s'.
211  *
212  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
213  * if standard_conforming_strings were enabled.
214  */
215 static void
216 print_literal(StringInfo s, Oid typid, char *outputstr)
217 {
218         const char *valptr;
219
220         switch (typid)
221         {
222                 case BOOLOID:
223                         if (outputstr[0] == 't')
224                                 appendStringInfoString(s, "true");
225                         else
226                                 appendStringInfoString(s, "false");
227                         break;
228
229                 case INT2OID:
230                 case INT4OID:
231                 case INT8OID:
232                 case OIDOID:
233                         /* NB: We don't care about Inf, NaN et al. */
234                         appendStringInfoString(s, outputstr);
235                         break;
236                 case FLOAT4OID:
237                 case FLOAT8OID:
238                 case NUMERICOID:
239
240                         /*
241                          * Numeric can have NaN. Float can have Nan, Infinity and
242                          * -Infinity. These need to be quoted.
243                          */
244                         if (strcmp(outputstr, "NaN") == 0 ||
245                                 strcmp(outputstr, "Infinity") == 0 ||
246                                 strcmp(outputstr, "-Infinity") == 0)
247                                 appendStringInfo(s, "'%s'", outputstr);
248                         else
249                                 appendStringInfoString(s, outputstr);
250                         break;
251                 case BITOID:
252                 case VARBITOID:
253                         appendStringInfo(s, "B'%s'", outputstr);
254                         break;
255
256                 default:
257                         appendStringInfoChar(s, '\'');
258                         for (valptr = outputstr; *valptr; valptr++)
259                         {
260                                 char            ch = *valptr;
261
262                                 if (SQL_STR_DOUBLE(ch, false))
263                                         appendStringInfoChar(s, ch);
264                                 appendStringInfoChar(s, ch);
265                         }
266                         appendStringInfoChar(s, '\'');
267                         break;
268         }
269 }
270
271 /*
272  * Print a relation name into the StringInfo provided by caller.
273  */
274 static void
275 print_relname(StringInfo s, Relation rel)
276 {
277         Form_pg_class class_form = RelationGetForm(rel);
278
279         appendStringInfoString(s,
280                                                    quote_qualified_identifier(
281                                                                                                           get_namespace_name(
282                                                                                                                                                  get_rel_namespace(RelationGetRelid(rel))),
283                                                                                                           NameStr(class_form->relname)));
284 }
285
286 /*
287  * Print a value into the StringInfo provided by caller.
288  */
289 static void
290 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
291 {
292         Oid                     typoutput;
293         bool            typisvarlena;
294
295         /* Query output function */
296         getTypeOutputInfo(typid,
297                                           &typoutput, &typisvarlena);
298
299         /* Print value */
300         if (isnull)
301                 appendStringInfoString(s, "null");
302         else if (typisvarlena &&
303                          VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
304         {
305                 /*
306                  * This should not happen, the column and its value can be skipped
307                  * properly. This code is let on purpose to avoid any traps in this
308                  * area in the future, generating useless queries in non-assert
309                  * builds.
310                  */
311                 Assert(0);
312                 appendStringInfoString(s, "unchanged-toast-datum");
313         }
314         else if (!typisvarlena)
315                 print_literal(s, typid,
316                                           OidOutputFunctionCall(typoutput, origval));
317         else
318         {
319                 /* Definitely detoasted Datum */
320                 Datum           val;
321
322                 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
323                 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
324         }
325 }
326
327 /*
328  * Print a WHERE clause item
329  */
330 static void
331 print_where_clause_item(StringInfo s,
332                                                 Relation relation,
333                                                 HeapTuple tuple,
334                                                 int natt,
335                                                 bool *first_column)
336 {
337         Form_pg_attribute attr;
338         Datum           origval;
339         bool            isnull;
340         TupleDesc       tupdesc = RelationGetDescr(relation);
341
342         attr = TupleDescAttr(tupdesc, natt - 1);
343
344         /* Skip dropped columns and system columns */
345         if (attr->attisdropped || attr->attnum < 0)
346                 return;
347
348         /* Skip comma for first colums */
349         if (!*first_column)
350                 appendStringInfoString(s, " AND ");
351         else
352                 *first_column = false;
353
354         /* Print attribute name */
355         appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
356
357         /* Get Datum from tuple */
358         origval = heap_getattr(tuple, natt, tupdesc, &isnull);
359
360         /* Get output function */
361         print_value(s, origval, attr->atttypid, isnull);
362 }
363
364 /*
365  * Generate a WHERE clause for UPDATE or DELETE.
366  */
367 static void
368 print_where_clause(StringInfo s,
369                                    Relation relation,
370                                    HeapTuple oldtuple,
371                                    HeapTuple newtuple)
372 {
373         TupleDesc       tupdesc = RelationGetDescr(relation);
374         int                     natt;
375         bool            first_column = true;
376
377         Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
378                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
379                    relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
380
381         /* Build the WHERE clause */
382         appendStringInfoString(s, " WHERE ");
383
384         RelationGetIndexList(relation);
385         /* Generate WHERE clause using new values of REPLICA IDENTITY */
386         if (OidIsValid(relation->rd_replidindex))
387         {
388                 Relation        indexRel;
389                 int                     key;
390
391                 /* Use all the values associated with the index */
392                 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
393                 for (key = 0; key < indexRel->rd_index->indnatts; key++)
394                 {
395                         int                     relattr = indexRel->rd_index->indkey.values[key];
396
397                         /*
398                          * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
399                          * if one of the columns used for tuple selectivity is changed,
400                          * the old tuple data is not NULL and need to be used for tuple
401                          * selectivity. If no such columns are updated, old tuple data is
402                          * NULL.
403                          */
404                         print_where_clause_item(s, relation,
405                                                                         oldtuple ? oldtuple : newtuple,
406                                                                         relattr, &first_column);
407                 }
408                 index_close(indexRel, NoLock);
409                 return;
410         }
411
412         /* We need absolutely some values for tuple selectivity now */
413         Assert(oldtuple != NULL &&
414                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
415
416         /*
417          * Fallback to default case, use of old values and print WHERE clause
418          * using all the columns. This is actually the code path for FULL.
419          */
420         for (natt = 0; natt < tupdesc->natts; natt++)
421                 print_where_clause_item(s, relation, oldtuple,
422                                                                 natt + 1, &first_column);
423 }
424
425 /*
426  * Decode an INSERT entry
427  */
428 static void
429 wal2sql_insert(StringInfo s,
430                            Relation relation,
431                            HeapTuple tuple)
432 {
433         TupleDesc       tupdesc = RelationGetDescr(relation);
434         int                     natt;
435         bool            first_column = true;
436         StringInfo      values = makeStringInfo();
437
438         /* Initialize string info for values */
439         initStringInfo(values);
440
441         /* Query header */
442         appendStringInfo(s, "INSERT INTO ");
443         print_relname(s, relation);
444         appendStringInfo(s, " (");
445
446         /* Build column names and values */
447         for (natt = 0; natt < tupdesc->natts; natt++)
448         {
449                 Form_pg_attribute attr;
450                 Datum           origval;
451                 bool            isnull;
452
453                 attr = TupleDescAttr(tupdesc, natt);
454
455                 /* Skip dropped columns and system columns */
456                 if (attr->attisdropped || attr->attnum < 0)
457                         continue;
458
459                 /* Skip comma for first colums */
460                 if (!first_column)
461                 {
462                         appendStringInfoString(s, ", ");
463                         appendStringInfoString(values, ", ");
464                 }
465                 else
466                         first_column = false;
467
468                 /* Print attribute name */
469                 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
470
471                 /* Get Datum from tuple */
472                 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
473
474                 /* Get output function */
475                 print_value(values, origval, attr->atttypid, isnull);
476         }
477
478         /* Append values  */
479         appendStringInfo(s, ") VALUES (%s);", values->data);
480
481         /* Clean up */
482         resetStringInfo(values);
483 }
484
485 /*
486  * Decode a DELETE entry
487  */
488 static void
489 wal2sql_delete(StringInfo s,
490                            Relation relation,
491                            HeapTuple tuple)
492 {
493         appendStringInfo(s, "DELETE FROM ");
494         print_relname(s, relation);
495
496         /*
497          * Here the same tuple is used as old and new values, selectivity will be
498          * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
499          */
500         print_where_clause(s, relation, tuple, tuple);
501         appendStringInfoString(s, ";");
502 }
503
504
505 /*
506  * Decode an UPDATE entry
507  */
508 static void
509 wal2sql_update(StringInfo s,
510                            Relation relation,
511                            HeapTuple oldtuple,
512                            HeapTuple newtuple)
513 {
514         TupleDesc       tupdesc = RelationGetDescr(relation);
515         int                     natt;
516         bool            first_column = true;
517
518         /* If there are no new values, simply leave as there is nothing to do */
519         if (newtuple == NULL)
520                 return;
521
522         appendStringInfo(s, "UPDATE ");
523         print_relname(s, relation);
524
525         /* Build the SET clause with the new values */
526         appendStringInfo(s, " SET ");
527         for (natt = 0; natt < tupdesc->natts; natt++)
528         {
529                 Form_pg_attribute attr;
530                 Datum           origval;
531                 bool            isnull;
532                 Oid                     typoutput;
533                 bool            typisvarlena;
534
535                 attr = TupleDescAttr(tupdesc, natt);
536
537                 /* Skip dropped columns and system columns */
538                 if (attr->attisdropped || attr->attnum < 0)
539                         continue;
540
541                 /* Get Datum from tuple */
542                 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
543
544                 /* Get output type so we can know if it's varlena */
545                 getTypeOutputInfo(attr->atttypid,
546                                                   &typoutput, &typisvarlena);
547
548                 /*
549                  * TOASTed datum, but it is not changed so it can be skipped this in
550                  * the SET clause of this UPDATE query.
551                  */
552                 if (!isnull && typisvarlena &&
553                         VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
554                         continue;
555
556                 /* Skip comma for first colums */
557                 if (!first_column)
558                 {
559                         appendStringInfoString(s, ", ");
560                 }
561                 else
562                         first_column = false;
563
564                 /* Print attribute name */
565                 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
566
567                 /* Get output function */
568                 print_value(s, origval, attr->atttypid, isnull);
569         }
570
571         /* Print WHERE clause */
572         print_where_clause(s, relation, oldtuple, newtuple);
573
574         appendStringInfoString(s, ";");
575 }
576
577 /*
578  * Callback for individual changed tuples
579  */
580 static void
581 wal2sql_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
582                            Relation relation, ReorderBufferChange *change)
583 {
584         Wal2SqlData *data;
585         MemoryContext old;
586         char            replident = relation->rd_rel->relreplident;
587         bool            is_rel_non_selective;
588
589         data = ctx->output_plugin_private;
590
591         /* Avoid leaking memory by using and resetting our own context */
592         old = MemoryContextSwitchTo(data->context);
593
594         /*
595          * Determine if relation is selective enough for WHERE clause generation
596          * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
597          * IDENTITY set as NOTHING, or DEFAULT without an available replica
598          * identity index.
599          */
600         RelationGetIndexList(relation);
601         is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
602                                                         (replident == REPLICA_IDENTITY_DEFAULT &&
603                                                          !OidIsValid(relation->rd_replidindex)));
604
605         /* Decode entry depending on its type */
606         switch (change->action)
607         {
608                 case REORDER_BUFFER_CHANGE_INSERT:
609                         if (change->data.tp.newtuple != NULL)
610                         {
611                                 OutputPluginPrepareWrite(ctx, true);
612                                 wal2sql_insert(ctx->out,
613                                                            relation,
614                                                            change->data.tp.newtuple);
615                                 OutputPluginWrite(ctx, true);
616                         }
617                         break;
618                 case REORDER_BUFFER_CHANGE_UPDATE:
619                         if (!is_rel_non_selective)
620                         {
621                                 HeapTuple       oldtuple = change->data.tp.oldtuple;
622                                 HeapTuple       newtuple = change->data.tp.newtuple;
623
624                                 OutputPluginPrepareWrite(ctx, true);
625                                 wal2sql_update(ctx->out,
626                                                            relation,
627                                                            oldtuple,
628                                                            newtuple);
629                                 OutputPluginWrite(ctx, true);
630                         }
631                         break;
632                 case REORDER_BUFFER_CHANGE_DELETE:
633                         if (!is_rel_non_selective)
634                         {
635                                 OutputPluginPrepareWrite(ctx, true);
636                                 wal2sql_delete(ctx->out,
637                                                            relation,
638                                                            change->data.tp.oldtuple);
639                                 OutputPluginWrite(ctx, true);
640                         }
641                         break;
642                 default:
643                         /* Should not come here */
644                         Assert(0);
645                         break;
646         }
647
648         MemoryContextSwitchTo(old);
649         MemoryContextReset(data->context);
650 }
651
652 /*
653  * Callback for generic logical decoding messages.
654  *
655  * This callback handles messages emitted via pg_logical_emit_message().
656  * We use this to capture DDL commands that are emitted by event triggers.
657  * The event triggers emit messages with prefix 'ddl' containing the DDL
658  * statement text.
659  */
660 static void
661 wal2sql_message(LogicalDecodingContext *ctx,
662                                 ReorderBufferTXN *txn,
663                                 XLogRecPtr message_lsn,
664                                 bool transactional,
665                                 const char *prefix,
666                                 Size message_size,
667                                 const char *message)
668 {
669         /*
670          * Only process messages with the 'ddl' prefix. These are DDL statements
671          * emitted by our event triggers using pg_logical_emit_message().
672          */
673         if (strcmp(prefix, "ddl") != 0)
674                 return;
675
676         /*
677          * Output the DDL statement. The message already contains the complete
678          * DDL command in executable SQL format (e.g., "ALTER TABLE foo ADD COLUMN bar TEXT;")
679          * so we output it as-is.
680          *
681          * Use appendBinaryStringInfo to properly handle the message with its size,
682          * as the message may not be null-terminated.
683          */
684         OutputPluginPrepareWrite(ctx, true);
685         appendBinaryStringInfo(ctx->out, message, message_size);
686         OutputPluginWrite(ctx, true);
687 }
688
689 /*
690  * Callback for TRUNCATE operations.
691  *
692  * TRUNCATE is a DML operation and does NOT trigger DDL event triggers,
693  * so it must be captured via logical replication using this callback.
694  * The callback receives an array of relations since TRUNCATE can affect
695  * multiple tables (e.g., via foreign key cascades or explicit multi-table syntax).
696  */
697 static void
698 wal2sql_truncate(LogicalDecodingContext *ctx,
699                                  ReorderBufferTXN *txn,
700                                  int nrelations,
701                                  Relation relations[],
702                                  ReorderBufferChange *change)
703 {
704         Wal2SqlData *data;
705         MemoryContext old;
706         int                     i;
707
708         data = ctx->output_plugin_private;
709
710         /* Avoid leaking memory by using and resetting our own context */
711         old = MemoryContextSwitchTo(data->context);
712
713         OutputPluginPrepareWrite(ctx, true);
714
715         /* Output TRUNCATE statement */
716         appendStringInfoString(ctx->out, "TRUNCATE TABLE ");
717
718         /* Add all table names, separated by commas */
719         for (i = 0; i < nrelations; i++)
720         {
721                 if (i > 0)
722                         appendStringInfoString(ctx->out, ", ");
723
724                 print_relname(ctx->out, relations[i]);
725         }
726
727         appendStringInfoString(ctx->out, ";");
728
729         OutputPluginWrite(ctx, true);
730
731         MemoryContextSwitchTo(old);
732         MemoryContextReset(data->context);
733 }