]> begriffs open source - pg_scribe/blob - wal2sql/wal2sql.c
Internal rename of forked extension
[pg_scribe] / wal2sql / wal2sql.c
1 /*-------------------------------------------------------------------------
2  *
3  * wal2sql.c
4  *              Logical decoding output plugin generating SQL queries based
5  *              on things decoded.
6  *
7  * Forked from decoder_raw by Michael Paquier
8  * https://github.com/michaelpq/pg_plugins/tree/main/decoder_raw
9  *
10  * Copyright (c) 1996-2025, PostgreSQL Global Development Group
11  *
12  * IDENTIFICATION
13  *                wal2sql/wal2sql.c
14  *
15  *-------------------------------------------------------------------------
16  */
17
18 #include "postgres.h"
19
20 #include "access/genam.h"
21 #include "access/sysattr.h"
22 #include "catalog/pg_class.h"
23 #include "catalog/pg_type.h"
24 #include "nodes/parsenodes.h"
25 #include "replication/output_plugin.h"
26 #include "replication/logical.h"
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
34
35
36 PG_MODULE_MAGIC;
37
38 /* These must be available to pg_dlsym() */
39 extern void _PG_init(void);
40 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
41
42 /*
43  * Structure storing the plugin specifications and options.
44  */
45 typedef struct
46 {
47         MemoryContext context;
48         bool            include_transaction;
49 }                       Wal2SqlData;
50
51 static void wal2sql_startup(LogicalDecodingContext *ctx,
52                                                         OutputPluginOptions *opt,
53                                                         bool is_init);
54 static void wal2sql_shutdown(LogicalDecodingContext *ctx);
55 static void wal2sql_begin_txn(LogicalDecodingContext *ctx,
56                                                           ReorderBufferTXN *txn);
57 static void wal2sql_commit_txn(LogicalDecodingContext *ctx,
58                                                            ReorderBufferTXN *txn,
59                                                            XLogRecPtr commit_lsn);
60 static void wal2sql_change(LogicalDecodingContext *ctx,
61                                                    ReorderBufferTXN *txn, Relation rel,
62                                                    ReorderBufferChange *change);
63
64 void
65 _PG_init(void)
66 {
67         /* other plugins can perform things here */
68 }
69
70 /* specify output plugin callbacks */
71 void
72 _PG_output_plugin_init(OutputPluginCallbacks *cb)
73 {
74         AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
75
76         cb->startup_cb = wal2sql_startup;
77         cb->begin_cb = wal2sql_begin_txn;
78         cb->change_cb = wal2sql_change;
79         cb->commit_cb = wal2sql_commit_txn;
80         cb->shutdown_cb = wal2sql_shutdown;
81 }
82
83
84 /* initialize this plugin */
85 static void
86 wal2sql_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
87                                 bool is_init)
88 {
89         ListCell   *option;
90         Wal2SqlData *data;
91
92         data = palloc(sizeof(Wal2SqlData));
93         data->context = AllocSetContextCreate(ctx->context,
94                                                                                   "wal2sql context",
95                                                                                   ALLOCSET_DEFAULT_SIZES);
96         data->include_transaction = false;
97
98         ctx->output_plugin_private = data;
99
100         /* Default output format */
101         opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
102
103         foreach(option, ctx->output_plugin_options)
104         {
105                 DefElem    *elem = lfirst(option);
106
107                 Assert(elem->arg == NULL || IsA(elem->arg, String));
108
109                 if (strcmp(elem->defname, "include_transaction") == 0)
110                 {
111                         /* if option does not provide a value, it means its value is true */
112                         if (elem->arg == NULL)
113                                 data->include_transaction = true;
114                         else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
115                                 ereport(ERROR,
116                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
118                                                                 strVal(elem->arg), elem->defname)));
119                 }
120                 else if (strcmp(elem->defname, "output_format") == 0)
121                 {
122                         char       *format = NULL;
123
124                         if (elem->arg == NULL)
125                                 ereport(ERROR,
126                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127                                                  errmsg("No value specified for parameter \"%s\"",
128                                                                 elem->defname)));
129
130                         format = strVal(elem->arg);
131
132                         if (strcmp(format, "textual") == 0)
133                                 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
134                         else if (strcmp(format, "binary") == 0)
135                                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
136                         else
137                                 ereport(ERROR,
138                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
139                                                  errmsg("Incorrect value \"%s\" for parameter \"%s\"",
140                                                                 format, elem->defname)));
141                 }
142                 else
143                 {
144                         ereport(ERROR,
145                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
146                                          errmsg("option \"%s\" = \"%s\" is unknown",
147                                                         elem->defname,
148                                                         elem->arg ? strVal(elem->arg) : "(null)")));
149                 }
150         }
151 }
152
153 /* cleanup this plugin's resources */
154 static void
155 wal2sql_shutdown(LogicalDecodingContext *ctx)
156 {
157         Wal2SqlData *data = ctx->output_plugin_private;
158
159         /* cleanup our own resources via memory context reset */
160         MemoryContextDelete(data->context);
161 }
162
163 /* BEGIN callback */
164 static void
165 wal2sql_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
166 {
167         Wal2SqlData *data = ctx->output_plugin_private;
168
169         /* Write to the plugin only if there is */
170         if (data->include_transaction)
171         {
172                 OutputPluginPrepareWrite(ctx, true);
173                 appendStringInfoString(ctx->out, "BEGIN;");
174                 OutputPluginWrite(ctx, true);
175         }
176 }
177
178 /* COMMIT callback */
179 static void
180 wal2sql_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
181                                    XLogRecPtr commit_lsn)
182 {
183         Wal2SqlData *data = ctx->output_plugin_private;
184
185         /* Write to the plugin only if there is */
186         if (data->include_transaction)
187         {
188                 OutputPluginPrepareWrite(ctx, true);
189                 appendStringInfoString(ctx->out, "COMMIT;");
190                 OutputPluginWrite(ctx, true);
191         }
192 }
193
194 /*
195  * Print literal `outputstr' already represented as string of type `typid'
196  * into stringbuf `s'.
197  *
198  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
199  * if standard_conforming_strings were enabled.
200  */
201 static void
202 print_literal(StringInfo s, Oid typid, char *outputstr)
203 {
204         const char *valptr;
205
206         switch (typid)
207         {
208                 case BOOLOID:
209                         if (outputstr[0] == 't')
210                                 appendStringInfoString(s, "true");
211                         else
212                                 appendStringInfoString(s, "false");
213                         break;
214
215                 case INT2OID:
216                 case INT4OID:
217                 case INT8OID:
218                 case OIDOID:
219                         /* NB: We don't care about Inf, NaN et al. */
220                         appendStringInfoString(s, outputstr);
221                         break;
222                 case FLOAT4OID:
223                 case FLOAT8OID:
224                 case NUMERICOID:
225
226                         /*
227                          * Numeric can have NaN. Float can have Nan, Infinity and
228                          * -Infinity. These need to be quoted.
229                          */
230                         if (strcmp(outputstr, "NaN") == 0 ||
231                                 strcmp(outputstr, "Infinity") == 0 ||
232                                 strcmp(outputstr, "-Infinity") == 0)
233                                 appendStringInfo(s, "'%s'", outputstr);
234                         else
235                                 appendStringInfoString(s, outputstr);
236                         break;
237                 case BITOID:
238                 case VARBITOID:
239                         appendStringInfo(s, "B'%s'", outputstr);
240                         break;
241
242                 default:
243                         appendStringInfoChar(s, '\'');
244                         for (valptr = outputstr; *valptr; valptr++)
245                         {
246                                 char            ch = *valptr;
247
248                                 if (SQL_STR_DOUBLE(ch, false))
249                                         appendStringInfoChar(s, ch);
250                                 appendStringInfoChar(s, ch);
251                         }
252                         appendStringInfoChar(s, '\'');
253                         break;
254         }
255 }
256
257 /*
258  * Print a relation name into the StringInfo provided by caller.
259  */
260 static void
261 print_relname(StringInfo s, Relation rel)
262 {
263         Form_pg_class class_form = RelationGetForm(rel);
264
265         appendStringInfoString(s,
266                                                    quote_qualified_identifier(
267                                                                                                           get_namespace_name(
268                                                                                                                                                  get_rel_namespace(RelationGetRelid(rel))),
269                                                                                                           NameStr(class_form->relname)));
270 }
271
272 /*
273  * Print a value into the StringInfo provided by caller.
274  */
275 static void
276 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
277 {
278         Oid                     typoutput;
279         bool            typisvarlena;
280
281         /* Query output function */
282         getTypeOutputInfo(typid,
283                                           &typoutput, &typisvarlena);
284
285         /* Print value */
286         if (isnull)
287                 appendStringInfoString(s, "null");
288         else if (typisvarlena &&
289                          VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
290         {
291                 /*
292                  * This should not happen, the column and its value can be skipped
293                  * properly. This code is let on purpose to avoid any traps in this
294                  * area in the future, generating useless queries in non-assert
295                  * builds.
296                  */
297                 Assert(0);
298                 appendStringInfoString(s, "unchanged-toast-datum");
299         }
300         else if (!typisvarlena)
301                 print_literal(s, typid,
302                                           OidOutputFunctionCall(typoutput, origval));
303         else
304         {
305                 /* Definitely detoasted Datum */
306                 Datum           val;
307
308                 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
309                 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
310         }
311 }
312
313 /*
314  * Print a WHERE clause item
315  */
316 static void
317 print_where_clause_item(StringInfo s,
318                                                 Relation relation,
319                                                 HeapTuple tuple,
320                                                 int natt,
321                                                 bool *first_column)
322 {
323         Form_pg_attribute attr;
324         Datum           origval;
325         bool            isnull;
326         TupleDesc       tupdesc = RelationGetDescr(relation);
327
328         attr = TupleDescAttr(tupdesc, natt - 1);
329
330         /* Skip dropped columns and system columns */
331         if (attr->attisdropped || attr->attnum < 0)
332                 return;
333
334         /* Skip comma for first colums */
335         if (!*first_column)
336                 appendStringInfoString(s, " AND ");
337         else
338                 *first_column = false;
339
340         /* Print attribute name */
341         appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
342
343         /* Get Datum from tuple */
344         origval = heap_getattr(tuple, natt, tupdesc, &isnull);
345
346         /* Get output function */
347         print_value(s, origval, attr->atttypid, isnull);
348 }
349
350 /*
351  * Generate a WHERE clause for UPDATE or DELETE.
352  */
353 static void
354 print_where_clause(StringInfo s,
355                                    Relation relation,
356                                    HeapTuple oldtuple,
357                                    HeapTuple newtuple)
358 {
359         TupleDesc       tupdesc = RelationGetDescr(relation);
360         int                     natt;
361         bool            first_column = true;
362
363         Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
364                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
365                    relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
366
367         /* Build the WHERE clause */
368         appendStringInfoString(s, " WHERE ");
369
370         RelationGetIndexList(relation);
371         /* Generate WHERE clause using new values of REPLICA IDENTITY */
372         if (OidIsValid(relation->rd_replidindex))
373         {
374                 Relation        indexRel;
375                 int                     key;
376
377                 /* Use all the values associated with the index */
378                 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
379                 for (key = 0; key < indexRel->rd_index->indnatts; key++)
380                 {
381                         int                     relattr = indexRel->rd_index->indkey.values[key];
382
383                         /*
384                          * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
385                          * if one of the columns used for tuple selectivity is changed,
386                          * the old tuple data is not NULL and need to be used for tuple
387                          * selectivity. If no such columns are updated, old tuple data is
388                          * NULL.
389                          */
390                         print_where_clause_item(s, relation,
391                                                                         oldtuple ? oldtuple : newtuple,
392                                                                         relattr, &first_column);
393                 }
394                 index_close(indexRel, NoLock);
395                 return;
396         }
397
398         /* We need absolutely some values for tuple selectivity now */
399         Assert(oldtuple != NULL &&
400                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
401
402         /*
403          * Fallback to default case, use of old values and print WHERE clause
404          * using all the columns. This is actually the code path for FULL.
405          */
406         for (natt = 0; natt < tupdesc->natts; natt++)
407                 print_where_clause_item(s, relation, oldtuple,
408                                                                 natt + 1, &first_column);
409 }
410
411 /*
412  * Decode an INSERT entry
413  */
414 static void
415 wal2sql_insert(StringInfo s,
416                            Relation relation,
417                            HeapTuple tuple)
418 {
419         TupleDesc       tupdesc = RelationGetDescr(relation);
420         int                     natt;
421         bool            first_column = true;
422         StringInfo      values = makeStringInfo();
423
424         /* Initialize string info for values */
425         initStringInfo(values);
426
427         /* Query header */
428         appendStringInfo(s, "INSERT INTO ");
429         print_relname(s, relation);
430         appendStringInfo(s, " (");
431
432         /* Build column names and values */
433         for (natt = 0; natt < tupdesc->natts; natt++)
434         {
435                 Form_pg_attribute attr;
436                 Datum           origval;
437                 bool            isnull;
438
439                 attr = TupleDescAttr(tupdesc, natt);
440
441                 /* Skip dropped columns and system columns */
442                 if (attr->attisdropped || attr->attnum < 0)
443                         continue;
444
445                 /* Skip comma for first colums */
446                 if (!first_column)
447                 {
448                         appendStringInfoString(s, ", ");
449                         appendStringInfoString(values, ", ");
450                 }
451                 else
452                         first_column = false;
453
454                 /* Print attribute name */
455                 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
456
457                 /* Get Datum from tuple */
458                 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
459
460                 /* Get output function */
461                 print_value(values, origval, attr->atttypid, isnull);
462         }
463
464         /* Append values  */
465         appendStringInfo(s, ") VALUES (%s);", values->data);
466
467         /* Clean up */
468         resetStringInfo(values);
469 }
470
471 /*
472  * Decode a DELETE entry
473  */
474 static void
475 wal2sql_delete(StringInfo s,
476                            Relation relation,
477                            HeapTuple tuple)
478 {
479         appendStringInfo(s, "DELETE FROM ");
480         print_relname(s, relation);
481
482         /*
483          * Here the same tuple is used as old and new values, selectivity will be
484          * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
485          */
486         print_where_clause(s, relation, tuple, tuple);
487         appendStringInfoString(s, ";");
488 }
489
490
491 /*
492  * Decode an UPDATE entry
493  */
494 static void
495 wal2sql_update(StringInfo s,
496                            Relation relation,
497                            HeapTuple oldtuple,
498                            HeapTuple newtuple)
499 {
500         TupleDesc       tupdesc = RelationGetDescr(relation);
501         int                     natt;
502         bool            first_column = true;
503
504         /* If there are no new values, simply leave as there is nothing to do */
505         if (newtuple == NULL)
506                 return;
507
508         appendStringInfo(s, "UPDATE ");
509         print_relname(s, relation);
510
511         /* Build the SET clause with the new values */
512         appendStringInfo(s, " SET ");
513         for (natt = 0; natt < tupdesc->natts; natt++)
514         {
515                 Form_pg_attribute attr;
516                 Datum           origval;
517                 bool            isnull;
518                 Oid                     typoutput;
519                 bool            typisvarlena;
520
521                 attr = TupleDescAttr(tupdesc, natt);
522
523                 /* Skip dropped columns and system columns */
524                 if (attr->attisdropped || attr->attnum < 0)
525                         continue;
526
527                 /* Get Datum from tuple */
528                 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
529
530                 /* Get output type so we can know if it's varlena */
531                 getTypeOutputInfo(attr->atttypid,
532                                                   &typoutput, &typisvarlena);
533
534                 /*
535                  * TOASTed datum, but it is not changed so it can be skipped this in
536                  * the SET clause of this UPDATE query.
537                  */
538                 if (!isnull && typisvarlena &&
539                         VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
540                         continue;
541
542                 /* Skip comma for first colums */
543                 if (!first_column)
544                 {
545                         appendStringInfoString(s, ", ");
546                 }
547                 else
548                         first_column = false;
549
550                 /* Print attribute name */
551                 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
552
553                 /* Get output function */
554                 print_value(s, origval, attr->atttypid, isnull);
555         }
556
557         /* Print WHERE clause */
558         print_where_clause(s, relation, oldtuple, newtuple);
559
560         appendStringInfoString(s, ";");
561 }
562
563 /*
564  * Callback for individual changed tuples
565  */
566 static void
567 wal2sql_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
568                            Relation relation, ReorderBufferChange *change)
569 {
570         Wal2SqlData *data;
571         MemoryContext old;
572         char            replident = relation->rd_rel->relreplident;
573         bool            is_rel_non_selective;
574
575         data = ctx->output_plugin_private;
576
577         /* Avoid leaking memory by using and resetting our own context */
578         old = MemoryContextSwitchTo(data->context);
579
580         /*
581          * Determine if relation is selective enough for WHERE clause generation
582          * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
583          * IDENTITY set as NOTHING, or DEFAULT without an available replica
584          * identity index.
585          */
586         RelationGetIndexList(relation);
587         is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
588                                                         (replident == REPLICA_IDENTITY_DEFAULT &&
589                                                          !OidIsValid(relation->rd_replidindex)));
590
591         /* Decode entry depending on its type */
592         switch (change->action)
593         {
594                 case REORDER_BUFFER_CHANGE_INSERT:
595                         if (change->data.tp.newtuple != NULL)
596                         {
597                                 OutputPluginPrepareWrite(ctx, true);
598                                 wal2sql_insert(ctx->out,
599                                                            relation,
600                                                            change->data.tp.newtuple);
601                                 OutputPluginWrite(ctx, true);
602                         }
603                         break;
604                 case REORDER_BUFFER_CHANGE_UPDATE:
605                         if (!is_rel_non_selective)
606                         {
607                                 HeapTuple       oldtuple = change->data.tp.oldtuple;
608                                 HeapTuple       newtuple = change->data.tp.newtuple;
609
610                                 OutputPluginPrepareWrite(ctx, true);
611                                 wal2sql_update(ctx->out,
612                                                            relation,
613                                                            oldtuple,
614                                                            newtuple);
615                                 OutputPluginWrite(ctx, true);
616                         }
617                         break;
618                 case REORDER_BUFFER_CHANGE_DELETE:
619                         if (!is_rel_non_selective)
620                         {
621                                 OutputPluginPrepareWrite(ctx, true);
622                                 wal2sql_delete(ctx->out,
623                                                            relation,
624                                                            change->data.tp.oldtuple);
625                                 OutputPluginWrite(ctx, true);
626                         }
627                         break;
628                 default:
629                         /* Should not come here */
630                         Assert(0);
631                         break;
632         }
633
634         MemoryContextSwitchTo(old);
635         MemoryContextReset(data->context);
636 }