]> begriffs open source - pg_scribe/blob - wal2sql/wal2sql.c
Initial draft of capturing DDL
[pg_scribe] / wal2sql / wal2sql.c
1 /*-------------------------------------------------------------------------
2  *
3  * wal2sql.c
4  *              Logical decoding output plugin generating SQL queries based
5  *              on things decoded.
6  *
7  * Forked from decoder_raw by Michael Paquier
8  * https://github.com/michaelpq/pg_plugins/tree/main/decoder_raw
9  *
10  * Copyright (c) 1996-2025, PostgreSQL Global Development Group
11  *
12  * IDENTIFICATION
13  *                wal2sql/wal2sql.c
14  *
15  *-------------------------------------------------------------------------
16  */
17
18 #include "postgres.h"
19
20 #include "access/genam.h"
21 #include "access/sysattr.h"
22 #include "catalog/pg_class.h"
23 #include "catalog/pg_type.h"
24 #include "nodes/parsenodes.h"
25 #include "replication/output_plugin.h"
26 #include "replication/logical.h"
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
34
35
36 PG_MODULE_MAGIC;
37
38 /* These must be available to pg_dlsym() */
39 extern void _PG_init(void);
40 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
41
42 /*
43  * Structure storing the plugin specifications and options.
44  */
45 typedef struct
46 {
47         MemoryContext context;
48         bool            include_transaction;
49 }                       Wal2SqlData;
50
51 static void wal2sql_startup(LogicalDecodingContext *ctx,
52                                                         OutputPluginOptions *opt,
53                                                         bool is_init);
54 static void wal2sql_shutdown(LogicalDecodingContext *ctx);
55 static void wal2sql_begin_txn(LogicalDecodingContext *ctx,
56                                                           ReorderBufferTXN *txn);
57 static void wal2sql_commit_txn(LogicalDecodingContext *ctx,
58                                                            ReorderBufferTXN *txn,
59                                                            XLogRecPtr commit_lsn);
60 static void wal2sql_change(LogicalDecodingContext *ctx,
61                                                    ReorderBufferTXN *txn, Relation rel,
62                                                    ReorderBufferChange *change);
63 static void wal2sql_message(LogicalDecodingContext *ctx,
64                                                         ReorderBufferTXN *txn,
65                                                         XLogRecPtr message_lsn,
66                                                         bool transactional,
67                                                         const char *prefix,
68                                                         Size message_size,
69                                                         const char *message);
70
71 void
72 _PG_init(void)
73 {
74         /* other plugins can perform things here */
75 }
76
77 /* specify output plugin callbacks */
78 void
79 _PG_output_plugin_init(OutputPluginCallbacks *cb)
80 {
81         AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
82
83         cb->startup_cb = wal2sql_startup;
84         cb->begin_cb = wal2sql_begin_txn;
85         cb->change_cb = wal2sql_change;
86         cb->commit_cb = wal2sql_commit_txn;
87         cb->message_cb = wal2sql_message;
88         cb->shutdown_cb = wal2sql_shutdown;
89 }
90
91
92 /* initialize this plugin */
93 static void
94 wal2sql_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
95                                 bool is_init)
96 {
97         ListCell   *option;
98         Wal2SqlData *data;
99
100         data = palloc(sizeof(Wal2SqlData));
101         data->context = AllocSetContextCreate(ctx->context,
102                                                                                   "wal2sql context",
103                                                                                   ALLOCSET_DEFAULT_SIZES);
104         data->include_transaction = false;
105
106         ctx->output_plugin_private = data;
107
108         /* Default output format */
109         opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
110
111         foreach(option, ctx->output_plugin_options)
112         {
113                 DefElem    *elem = lfirst(option);
114
115                 Assert(elem->arg == NULL || IsA(elem->arg, String));
116
117                 if (strcmp(elem->defname, "include_transaction") == 0)
118                 {
119                         /* if option does not provide a value, it means its value is true */
120                         if (elem->arg == NULL)
121                                 data->include_transaction = true;
122                         else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
123                                 ereport(ERROR,
124                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
125                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
126                                                                 strVal(elem->arg), elem->defname)));
127                 }
128                 else if (strcmp(elem->defname, "output_format") == 0)
129                 {
130                         char       *format = NULL;
131
132                         if (elem->arg == NULL)
133                                 ereport(ERROR,
134                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
135                                                  errmsg("No value specified for parameter \"%s\"",
136                                                                 elem->defname)));
137
138                         format = strVal(elem->arg);
139
140                         if (strcmp(format, "textual") == 0)
141                                 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
142                         else if (strcmp(format, "binary") == 0)
143                                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
144                         else
145                                 ereport(ERROR,
146                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
147                                                  errmsg("Incorrect value \"%s\" for parameter \"%s\"",
148                                                                 format, elem->defname)));
149                 }
150                 else
151                 {
152                         ereport(ERROR,
153                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154                                          errmsg("option \"%s\" = \"%s\" is unknown",
155                                                         elem->defname,
156                                                         elem->arg ? strVal(elem->arg) : "(null)")));
157                 }
158         }
159 }
160
161 /* cleanup this plugin's resources */
162 static void
163 wal2sql_shutdown(LogicalDecodingContext *ctx)
164 {
165         Wal2SqlData *data = ctx->output_plugin_private;
166
167         /* cleanup our own resources via memory context reset */
168         MemoryContextDelete(data->context);
169 }
170
171 /* BEGIN callback */
172 static void
173 wal2sql_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
174 {
175         Wal2SqlData *data = ctx->output_plugin_private;
176
177         /* Write to the plugin only if there is */
178         if (data->include_transaction)
179         {
180                 OutputPluginPrepareWrite(ctx, true);
181                 appendStringInfoString(ctx->out, "BEGIN;");
182                 OutputPluginWrite(ctx, true);
183         }
184 }
185
186 /* COMMIT callback */
187 static void
188 wal2sql_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
189                                    XLogRecPtr commit_lsn)
190 {
191         Wal2SqlData *data = ctx->output_plugin_private;
192
193         /* Write to the plugin only if there is */
194         if (data->include_transaction)
195         {
196                 OutputPluginPrepareWrite(ctx, true);
197                 appendStringInfoString(ctx->out, "COMMIT;");
198                 OutputPluginWrite(ctx, true);
199         }
200 }
201
202 /*
203  * Print literal `outputstr' already represented as string of type `typid'
204  * into stringbuf `s'.
205  *
206  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
207  * if standard_conforming_strings were enabled.
208  */
209 static void
210 print_literal(StringInfo s, Oid typid, char *outputstr)
211 {
212         const char *valptr;
213
214         switch (typid)
215         {
216                 case BOOLOID:
217                         if (outputstr[0] == 't')
218                                 appendStringInfoString(s, "true");
219                         else
220                                 appendStringInfoString(s, "false");
221                         break;
222
223                 case INT2OID:
224                 case INT4OID:
225                 case INT8OID:
226                 case OIDOID:
227                         /* NB: We don't care about Inf, NaN et al. */
228                         appendStringInfoString(s, outputstr);
229                         break;
230                 case FLOAT4OID:
231                 case FLOAT8OID:
232                 case NUMERICOID:
233
234                         /*
235                          * Numeric can have NaN. Float can have Nan, Infinity and
236                          * -Infinity. These need to be quoted.
237                          */
238                         if (strcmp(outputstr, "NaN") == 0 ||
239                                 strcmp(outputstr, "Infinity") == 0 ||
240                                 strcmp(outputstr, "-Infinity") == 0)
241                                 appendStringInfo(s, "'%s'", outputstr);
242                         else
243                                 appendStringInfoString(s, outputstr);
244                         break;
245                 case BITOID:
246                 case VARBITOID:
247                         appendStringInfo(s, "B'%s'", outputstr);
248                         break;
249
250                 default:
251                         appendStringInfoChar(s, '\'');
252                         for (valptr = outputstr; *valptr; valptr++)
253                         {
254                                 char            ch = *valptr;
255
256                                 if (SQL_STR_DOUBLE(ch, false))
257                                         appendStringInfoChar(s, ch);
258                                 appendStringInfoChar(s, ch);
259                         }
260                         appendStringInfoChar(s, '\'');
261                         break;
262         }
263 }
264
265 /*
266  * Print a relation name into the StringInfo provided by caller.
267  */
268 static void
269 print_relname(StringInfo s, Relation rel)
270 {
271         Form_pg_class class_form = RelationGetForm(rel);
272
273         appendStringInfoString(s,
274                                                    quote_qualified_identifier(
275                                                                                                           get_namespace_name(
276                                                                                                                                                  get_rel_namespace(RelationGetRelid(rel))),
277                                                                                                           NameStr(class_form->relname)));
278 }
279
280 /*
281  * Print a value into the StringInfo provided by caller.
282  */
283 static void
284 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
285 {
286         Oid                     typoutput;
287         bool            typisvarlena;
288
289         /* Query output function */
290         getTypeOutputInfo(typid,
291                                           &typoutput, &typisvarlena);
292
293         /* Print value */
294         if (isnull)
295                 appendStringInfoString(s, "null");
296         else if (typisvarlena &&
297                          VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
298         {
299                 /*
300                  * This should not happen, the column and its value can be skipped
301                  * properly. This code is let on purpose to avoid any traps in this
302                  * area in the future, generating useless queries in non-assert
303                  * builds.
304                  */
305                 Assert(0);
306                 appendStringInfoString(s, "unchanged-toast-datum");
307         }
308         else if (!typisvarlena)
309                 print_literal(s, typid,
310                                           OidOutputFunctionCall(typoutput, origval));
311         else
312         {
313                 /* Definitely detoasted Datum */
314                 Datum           val;
315
316                 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
317                 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
318         }
319 }
320
321 /*
322  * Print a WHERE clause item
323  */
324 static void
325 print_where_clause_item(StringInfo s,
326                                                 Relation relation,
327                                                 HeapTuple tuple,
328                                                 int natt,
329                                                 bool *first_column)
330 {
331         Form_pg_attribute attr;
332         Datum           origval;
333         bool            isnull;
334         TupleDesc       tupdesc = RelationGetDescr(relation);
335
336         attr = TupleDescAttr(tupdesc, natt - 1);
337
338         /* Skip dropped columns and system columns */
339         if (attr->attisdropped || attr->attnum < 0)
340                 return;
341
342         /* Skip comma for first colums */
343         if (!*first_column)
344                 appendStringInfoString(s, " AND ");
345         else
346                 *first_column = false;
347
348         /* Print attribute name */
349         appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
350
351         /* Get Datum from tuple */
352         origval = heap_getattr(tuple, natt, tupdesc, &isnull);
353
354         /* Get output function */
355         print_value(s, origval, attr->atttypid, isnull);
356 }
357
358 /*
359  * Generate a WHERE clause for UPDATE or DELETE.
360  */
361 static void
362 print_where_clause(StringInfo s,
363                                    Relation relation,
364                                    HeapTuple oldtuple,
365                                    HeapTuple newtuple)
366 {
367         TupleDesc       tupdesc = RelationGetDescr(relation);
368         int                     natt;
369         bool            first_column = true;
370
371         Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
372                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
373                    relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
374
375         /* Build the WHERE clause */
376         appendStringInfoString(s, " WHERE ");
377
378         RelationGetIndexList(relation);
379         /* Generate WHERE clause using new values of REPLICA IDENTITY */
380         if (OidIsValid(relation->rd_replidindex))
381         {
382                 Relation        indexRel;
383                 int                     key;
384
385                 /* Use all the values associated with the index */
386                 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
387                 for (key = 0; key < indexRel->rd_index->indnatts; key++)
388                 {
389                         int                     relattr = indexRel->rd_index->indkey.values[key];
390
391                         /*
392                          * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
393                          * if one of the columns used for tuple selectivity is changed,
394                          * the old tuple data is not NULL and need to be used for tuple
395                          * selectivity. If no such columns are updated, old tuple data is
396                          * NULL.
397                          */
398                         print_where_clause_item(s, relation,
399                                                                         oldtuple ? oldtuple : newtuple,
400                                                                         relattr, &first_column);
401                 }
402                 index_close(indexRel, NoLock);
403                 return;
404         }
405
406         /* We need absolutely some values for tuple selectivity now */
407         Assert(oldtuple != NULL &&
408                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
409
410         /*
411          * Fallback to default case, use of old values and print WHERE clause
412          * using all the columns. This is actually the code path for FULL.
413          */
414         for (natt = 0; natt < tupdesc->natts; natt++)
415                 print_where_clause_item(s, relation, oldtuple,
416                                                                 natt + 1, &first_column);
417 }
418
419 /*
420  * Decode an INSERT entry
421  */
422 static void
423 wal2sql_insert(StringInfo s,
424                            Relation relation,
425                            HeapTuple tuple)
426 {
427         TupleDesc       tupdesc = RelationGetDescr(relation);
428         int                     natt;
429         bool            first_column = true;
430         StringInfo      values = makeStringInfo();
431
432         /* Initialize string info for values */
433         initStringInfo(values);
434
435         /* Query header */
436         appendStringInfo(s, "INSERT INTO ");
437         print_relname(s, relation);
438         appendStringInfo(s, " (");
439
440         /* Build column names and values */
441         for (natt = 0; natt < tupdesc->natts; natt++)
442         {
443                 Form_pg_attribute attr;
444                 Datum           origval;
445                 bool            isnull;
446
447                 attr = TupleDescAttr(tupdesc, natt);
448
449                 /* Skip dropped columns and system columns */
450                 if (attr->attisdropped || attr->attnum < 0)
451                         continue;
452
453                 /* Skip comma for first colums */
454                 if (!first_column)
455                 {
456                         appendStringInfoString(s, ", ");
457                         appendStringInfoString(values, ", ");
458                 }
459                 else
460                         first_column = false;
461
462                 /* Print attribute name */
463                 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
464
465                 /* Get Datum from tuple */
466                 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
467
468                 /* Get output function */
469                 print_value(values, origval, attr->atttypid, isnull);
470         }
471
472         /* Append values  */
473         appendStringInfo(s, ") VALUES (%s);", values->data);
474
475         /* Clean up */
476         resetStringInfo(values);
477 }
478
479 /*
480  * Decode a DELETE entry
481  */
482 static void
483 wal2sql_delete(StringInfo s,
484                            Relation relation,
485                            HeapTuple tuple)
486 {
487         appendStringInfo(s, "DELETE FROM ");
488         print_relname(s, relation);
489
490         /*
491          * Here the same tuple is used as old and new values, selectivity will be
492          * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
493          */
494         print_where_clause(s, relation, tuple, tuple);
495         appendStringInfoString(s, ";");
496 }
497
498
499 /*
500  * Decode an UPDATE entry
501  */
502 static void
503 wal2sql_update(StringInfo s,
504                            Relation relation,
505                            HeapTuple oldtuple,
506                            HeapTuple newtuple)
507 {
508         TupleDesc       tupdesc = RelationGetDescr(relation);
509         int                     natt;
510         bool            first_column = true;
511
512         /* If there are no new values, simply leave as there is nothing to do */
513         if (newtuple == NULL)
514                 return;
515
516         appendStringInfo(s, "UPDATE ");
517         print_relname(s, relation);
518
519         /* Build the SET clause with the new values */
520         appendStringInfo(s, " SET ");
521         for (natt = 0; natt < tupdesc->natts; natt++)
522         {
523                 Form_pg_attribute attr;
524                 Datum           origval;
525                 bool            isnull;
526                 Oid                     typoutput;
527                 bool            typisvarlena;
528
529                 attr = TupleDescAttr(tupdesc, natt);
530
531                 /* Skip dropped columns and system columns */
532                 if (attr->attisdropped || attr->attnum < 0)
533                         continue;
534
535                 /* Get Datum from tuple */
536                 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
537
538                 /* Get output type so we can know if it's varlena */
539                 getTypeOutputInfo(attr->atttypid,
540                                                   &typoutput, &typisvarlena);
541
542                 /*
543                  * TOASTed datum, but it is not changed so it can be skipped this in
544                  * the SET clause of this UPDATE query.
545                  */
546                 if (!isnull && typisvarlena &&
547                         VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
548                         continue;
549
550                 /* Skip comma for first colums */
551                 if (!first_column)
552                 {
553                         appendStringInfoString(s, ", ");
554                 }
555                 else
556                         first_column = false;
557
558                 /* Print attribute name */
559                 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
560
561                 /* Get output function */
562                 print_value(s, origval, attr->atttypid, isnull);
563         }
564
565         /* Print WHERE clause */
566         print_where_clause(s, relation, oldtuple, newtuple);
567
568         appendStringInfoString(s, ";");
569 }
570
571 /*
572  * Callback for individual changed tuples
573  */
574 static void
575 wal2sql_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
576                            Relation relation, ReorderBufferChange *change)
577 {
578         Wal2SqlData *data;
579         MemoryContext old;
580         char            replident = relation->rd_rel->relreplident;
581         bool            is_rel_non_selective;
582
583         data = ctx->output_plugin_private;
584
585         /* Avoid leaking memory by using and resetting our own context */
586         old = MemoryContextSwitchTo(data->context);
587
588         /*
589          * Determine if relation is selective enough for WHERE clause generation
590          * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
591          * IDENTITY set as NOTHING, or DEFAULT without an available replica
592          * identity index.
593          */
594         RelationGetIndexList(relation);
595         is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
596                                                         (replident == REPLICA_IDENTITY_DEFAULT &&
597                                                          !OidIsValid(relation->rd_replidindex)));
598
599         /* Decode entry depending on its type */
600         switch (change->action)
601         {
602                 case REORDER_BUFFER_CHANGE_INSERT:
603                         if (change->data.tp.newtuple != NULL)
604                         {
605                                 OutputPluginPrepareWrite(ctx, true);
606                                 wal2sql_insert(ctx->out,
607                                                            relation,
608                                                            change->data.tp.newtuple);
609                                 OutputPluginWrite(ctx, true);
610                         }
611                         break;
612                 case REORDER_BUFFER_CHANGE_UPDATE:
613                         if (!is_rel_non_selective)
614                         {
615                                 HeapTuple       oldtuple = change->data.tp.oldtuple;
616                                 HeapTuple       newtuple = change->data.tp.newtuple;
617
618                                 OutputPluginPrepareWrite(ctx, true);
619                                 wal2sql_update(ctx->out,
620                                                            relation,
621                                                            oldtuple,
622                                                            newtuple);
623                                 OutputPluginWrite(ctx, true);
624                         }
625                         break;
626                 case REORDER_BUFFER_CHANGE_DELETE:
627                         if (!is_rel_non_selective)
628                         {
629                                 OutputPluginPrepareWrite(ctx, true);
630                                 wal2sql_delete(ctx->out,
631                                                            relation,
632                                                            change->data.tp.oldtuple);
633                                 OutputPluginWrite(ctx, true);
634                         }
635                         break;
636                 default:
637                         /* Should not come here */
638                         Assert(0);
639                         break;
640         }
641
642         MemoryContextSwitchTo(old);
643         MemoryContextReset(data->context);
644 }
645
646 /*
647  * Callback for generic logical decoding messages.
648  *
649  * This callback handles messages emitted via pg_logical_emit_message().
650  * We use this to capture DDL commands that are emitted by event triggers.
651  * The event triggers emit messages with prefix 'ddl' containing the DDL
652  * statement text.
653  */
654 static void
655 wal2sql_message(LogicalDecodingContext *ctx,
656                                 ReorderBufferTXN *txn,
657                                 XLogRecPtr message_lsn,
658                                 bool transactional,
659                                 const char *prefix,
660                                 Size message_size,
661                                 const char *message)
662 {
663         /*
664          * Only process messages with the 'ddl' prefix. These are DDL statements
665          * emitted by our event triggers using pg_logical_emit_message().
666          */
667         if (strcmp(prefix, "ddl") != 0)
668                 return;
669
670         /*
671          * Output the DDL statement. The message already contains the complete
672          * DDL command in executable SQL format (e.g., "ALTER TABLE foo ADD COLUMN bar TEXT;")
673          * so we output it as-is.
674          *
675          * Use appendBinaryStringInfo to properly handle the message with its size,
676          * as the message may not be null-terminated.
677          */
678         OutputPluginPrepareWrite(ctx, true);
679         appendBinaryStringInfo(ctx->out, message, message_size);
680         OutputPluginWrite(ctx, true);
681 }