]> begriffs open source - pg_scribe/blob - wal2sql/decoder_raw.c
Begin working on the fork of Michael Paquier's decoder_raw extension
[pg_scribe] / wal2sql / decoder_raw.c
1 /*-------------------------------------------------------------------------
2  *
3  * decoder_raw.c
4  *              Logical decoding output plugin generating SQL queries based
5  *              on things decoded.
6  *
7  * Copyright (c) 1996-2025, PostgreSQL Global Development Group
8  *
9  * IDENTIFICATION
10  *                decoder_raw/decoder_raw.c
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "catalog/pg_type.h"
21 #include "nodes/parsenodes.h"
22 #include "replication/output_plugin.h"
23 #include "replication/logical.h"
24 #include "utils/builtins.h"
25 #include "utils/lsyscache.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28 #include "utils/relcache.h"
29 #include "utils/syscache.h"
30 #include "utils/typcache.h"
31
32
33 PG_MODULE_MAGIC;
34
35 /* These must be available to pg_dlsym() */
36 extern void _PG_init(void);
37 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
38
39 /*
40  * Structure storing the plugin specifications and options.
41  */
42 typedef struct
43 {
44         MemoryContext context;
45         bool            include_transaction;
46 }                       DecoderRawData;
47
48 static void decoder_raw_startup(LogicalDecodingContext *ctx,
49                                                                 OutputPluginOptions *opt,
50                                                                 bool is_init);
51 static void decoder_raw_shutdown(LogicalDecodingContext *ctx);
52 static void decoder_raw_begin_txn(LogicalDecodingContext *ctx,
53                                                                   ReorderBufferTXN *txn);
54 static void decoder_raw_commit_txn(LogicalDecodingContext *ctx,
55                                                                    ReorderBufferTXN *txn,
56                                                                    XLogRecPtr commit_lsn);
57 static void decoder_raw_change(LogicalDecodingContext *ctx,
58                                                            ReorderBufferTXN *txn, Relation rel,
59                                                            ReorderBufferChange *change);
60
61 void
62 _PG_init(void)
63 {
64         /* other plugins can perform things here */
65 }
66
67 /* specify output plugin callbacks */
68 void
69 _PG_output_plugin_init(OutputPluginCallbacks *cb)
70 {
71         AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
72
73         cb->startup_cb = decoder_raw_startup;
74         cb->begin_cb = decoder_raw_begin_txn;
75         cb->change_cb = decoder_raw_change;
76         cb->commit_cb = decoder_raw_commit_txn;
77         cb->shutdown_cb = decoder_raw_shutdown;
78 }
79
80
81 /* initialize this plugin */
82 static void
83 decoder_raw_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
84                                         bool is_init)
85 {
86         ListCell   *option;
87         DecoderRawData *data;
88
89         data = palloc(sizeof(DecoderRawData));
90         data->context = AllocSetContextCreate(ctx->context,
91                                                                                   "Raw decoder context",
92                                                                                   ALLOCSET_DEFAULT_SIZES);
93         data->include_transaction = false;
94
95         ctx->output_plugin_private = data;
96
97         /* Default output format */
98         opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
99
100         foreach(option, ctx->output_plugin_options)
101         {
102                 DefElem    *elem = lfirst(option);
103
104                 Assert(elem->arg == NULL || IsA(elem->arg, String));
105
106                 if (strcmp(elem->defname, "include_transaction") == 0)
107                 {
108                         /* if option does not provide a value, it means its value is true */
109                         if (elem->arg == NULL)
110                                 data->include_transaction = true;
111                         else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
112                                 ereport(ERROR,
113                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114                                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",
115                                                                 strVal(elem->arg), elem->defname)));
116                 }
117                 else if (strcmp(elem->defname, "output_format") == 0)
118                 {
119                         char       *format = NULL;
120
121                         if (elem->arg == NULL)
122                                 ereport(ERROR,
123                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
124                                                  errmsg("No value specified for parameter \"%s\"",
125                                                                 elem->defname)));
126
127                         format = strVal(elem->arg);
128
129                         if (strcmp(format, "textual") == 0)
130                                 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
131                         else if (strcmp(format, "binary") == 0)
132                                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
133                         else
134                                 ereport(ERROR,
135                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
136                                                  errmsg("Incorrect value \"%s\" for parameter \"%s\"",
137                                                                 format, elem->defname)));
138                 }
139                 else
140                 {
141                         ereport(ERROR,
142                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
143                                          errmsg("option \"%s\" = \"%s\" is unknown",
144                                                         elem->defname,
145                                                         elem->arg ? strVal(elem->arg) : "(null)")));
146                 }
147         }
148 }
149
150 /* cleanup this plugin's resources */
151 static void
152 decoder_raw_shutdown(LogicalDecodingContext *ctx)
153 {
154         DecoderRawData *data = ctx->output_plugin_private;
155
156         /* cleanup our own resources via memory context reset */
157         MemoryContextDelete(data->context);
158 }
159
160 /* BEGIN callback */
161 static void
162 decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
163 {
164         DecoderRawData *data = ctx->output_plugin_private;
165
166         /* Write to the plugin only if there is */
167         if (data->include_transaction)
168         {
169                 OutputPluginPrepareWrite(ctx, true);
170                 appendStringInfoString(ctx->out, "BEGIN;");
171                 OutputPluginWrite(ctx, true);
172         }
173 }
174
175 /* COMMIT callback */
176 static void
177 decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
178                                            XLogRecPtr commit_lsn)
179 {
180         DecoderRawData *data = ctx->output_plugin_private;
181
182         /* Write to the plugin only if there is */
183         if (data->include_transaction)
184         {
185                 OutputPluginPrepareWrite(ctx, true);
186                 appendStringInfoString(ctx->out, "COMMIT;");
187                 OutputPluginWrite(ctx, true);
188         }
189 }
190
191 /*
192  * Print literal `outputstr' already represented as string of type `typid'
193  * into stringbuf `s'.
194  *
195  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
196  * if standard_conforming_strings were enabled.
197  */
198 static void
199 print_literal(StringInfo s, Oid typid, char *outputstr)
200 {
201         const char *valptr;
202
203         switch (typid)
204         {
205                 case BOOLOID:
206                         if (outputstr[0] == 't')
207                                 appendStringInfoString(s, "true");
208                         else
209                                 appendStringInfoString(s, "false");
210                         break;
211
212                 case INT2OID:
213                 case INT4OID:
214                 case INT8OID:
215                 case OIDOID:
216                         /* NB: We don't care about Inf, NaN et al. */
217                         appendStringInfoString(s, outputstr);
218                         break;
219                 case FLOAT4OID:
220                 case FLOAT8OID:
221                 case NUMERICOID:
222
223                         /*
224                          * Numeric can have NaN. Float can have Nan, Infinity and
225                          * -Infinity. These need to be quoted.
226                          */
227                         if (strcmp(outputstr, "NaN") == 0 ||
228                                 strcmp(outputstr, "Infinity") == 0 ||
229                                 strcmp(outputstr, "-Infinity") == 0)
230                                 appendStringInfo(s, "'%s'", outputstr);
231                         else
232                                 appendStringInfoString(s, outputstr);
233                         break;
234                 case BITOID:
235                 case VARBITOID:
236                         appendStringInfo(s, "B'%s'", outputstr);
237                         break;
238
239                 default:
240                         appendStringInfoChar(s, '\'');
241                         for (valptr = outputstr; *valptr; valptr++)
242                         {
243                                 char            ch = *valptr;
244
245                                 if (SQL_STR_DOUBLE(ch, false))
246                                         appendStringInfoChar(s, ch);
247                                 appendStringInfoChar(s, ch);
248                         }
249                         appendStringInfoChar(s, '\'');
250                         break;
251         }
252 }
253
254 /*
255  * Print a relation name into the StringInfo provided by caller.
256  */
257 static void
258 print_relname(StringInfo s, Relation rel)
259 {
260         Form_pg_class class_form = RelationGetForm(rel);
261
262         appendStringInfoString(s,
263                                                    quote_qualified_identifier(
264                                                                                                           get_namespace_name(
265                                                                                                                                                  get_rel_namespace(RelationGetRelid(rel))),
266                                                                                                           NameStr(class_form->relname)));
267 }
268
269 /*
270  * Print a value into the StringInfo provided by caller.
271  */
272 static void
273 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
274 {
275         Oid                     typoutput;
276         bool            typisvarlena;
277
278         /* Query output function */
279         getTypeOutputInfo(typid,
280                                           &typoutput, &typisvarlena);
281
282         /* Print value */
283         if (isnull)
284                 appendStringInfoString(s, "null");
285         else if (typisvarlena &&
286                          VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
287         {
288                 /*
289                  * This should not happen, the column and its value can be skipped
290                  * properly. This code is let on purpose to avoid any traps in this
291                  * area in the future, generating useless queries in non-assert
292                  * builds.
293                  */
294                 Assert(0);
295                 appendStringInfoString(s, "unchanged-toast-datum");
296         }
297         else if (!typisvarlena)
298                 print_literal(s, typid,
299                                           OidOutputFunctionCall(typoutput, origval));
300         else
301         {
302                 /* Definitely detoasted Datum */
303                 Datum           val;
304
305                 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
306                 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
307         }
308 }
309
310 /*
311  * Print a WHERE clause item
312  */
313 static void
314 print_where_clause_item(StringInfo s,
315                                                 Relation relation,
316                                                 HeapTuple tuple,
317                                                 int natt,
318                                                 bool *first_column)
319 {
320         Form_pg_attribute attr;
321         Datum           origval;
322         bool            isnull;
323         TupleDesc       tupdesc = RelationGetDescr(relation);
324
325         attr = TupleDescAttr(tupdesc, natt - 1);
326
327         /* Skip dropped columns and system columns */
328         if (attr->attisdropped || attr->attnum < 0)
329                 return;
330
331         /* Skip comma for first colums */
332         if (!*first_column)
333                 appendStringInfoString(s, " AND ");
334         else
335                 *first_column = false;
336
337         /* Print attribute name */
338         appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
339
340         /* Get Datum from tuple */
341         origval = heap_getattr(tuple, natt, tupdesc, &isnull);
342
343         /* Get output function */
344         print_value(s, origval, attr->atttypid, isnull);
345 }
346
347 /*
348  * Generate a WHERE clause for UPDATE or DELETE.
349  */
350 static void
351 print_where_clause(StringInfo s,
352                                    Relation relation,
353                                    HeapTuple oldtuple,
354                                    HeapTuple newtuple)
355 {
356         TupleDesc       tupdesc = RelationGetDescr(relation);
357         int                     natt;
358         bool            first_column = true;
359
360         Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
361                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
362                    relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
363
364         /* Build the WHERE clause */
365         appendStringInfoString(s, " WHERE ");
366
367         RelationGetIndexList(relation);
368         /* Generate WHERE clause using new values of REPLICA IDENTITY */
369         if (OidIsValid(relation->rd_replidindex))
370         {
371                 Relation        indexRel;
372                 int                     key;
373
374                 /* Use all the values associated with the index */
375                 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
376                 for (key = 0; key < indexRel->rd_index->indnatts; key++)
377                 {
378                         int                     relattr = indexRel->rd_index->indkey.values[key];
379
380                         /*
381                          * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
382                          * if one of the columns used for tuple selectivity is changed,
383                          * the old tuple data is not NULL and need to be used for tuple
384                          * selectivity. If no such columns are updated, old tuple data is
385                          * NULL.
386                          */
387                         print_where_clause_item(s, relation,
388                                                                         oldtuple ? oldtuple : newtuple,
389                                                                         relattr, &first_column);
390                 }
391                 index_close(indexRel, NoLock);
392                 return;
393         }
394
395         /* We need absolutely some values for tuple selectivity now */
396         Assert(oldtuple != NULL &&
397                    relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
398
399         /*
400          * Fallback to default case, use of old values and print WHERE clause
401          * using all the columns. This is actually the code path for FULL.
402          */
403         for (natt = 0; natt < tupdesc->natts; natt++)
404                 print_where_clause_item(s, relation, oldtuple,
405                                                                 natt + 1, &first_column);
406 }
407
408 /*
409  * Decode an INSERT entry
410  */
411 static void
412 decoder_raw_insert(StringInfo s,
413                                    Relation relation,
414                                    HeapTuple tuple)
415 {
416         TupleDesc       tupdesc = RelationGetDescr(relation);
417         int                     natt;
418         bool            first_column = true;
419         StringInfo      values = makeStringInfo();
420
421         /* Initialize string info for values */
422         initStringInfo(values);
423
424         /* Query header */
425         appendStringInfo(s, "INSERT INTO ");
426         print_relname(s, relation);
427         appendStringInfo(s, " (");
428
429         /* Build column names and values */
430         for (natt = 0; natt < tupdesc->natts; natt++)
431         {
432                 Form_pg_attribute attr;
433                 Datum           origval;
434                 bool            isnull;
435
436                 attr = TupleDescAttr(tupdesc, natt);
437
438                 /* Skip dropped columns and system columns */
439                 if (attr->attisdropped || attr->attnum < 0)
440                         continue;
441
442                 /* Skip comma for first colums */
443                 if (!first_column)
444                 {
445                         appendStringInfoString(s, ", ");
446                         appendStringInfoString(values, ", ");
447                 }
448                 else
449                         first_column = false;
450
451                 /* Print attribute name */
452                 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
453
454                 /* Get Datum from tuple */
455                 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
456
457                 /* Get output function */
458                 print_value(values, origval, attr->atttypid, isnull);
459         }
460
461         /* Append values  */
462         appendStringInfo(s, ") VALUES (%s);", values->data);
463
464         /* Clean up */
465         resetStringInfo(values);
466 }
467
468 /*
469  * Decode a DELETE entry
470  */
471 static void
472 decoder_raw_delete(StringInfo s,
473                                    Relation relation,
474                                    HeapTuple tuple)
475 {
476         appendStringInfo(s, "DELETE FROM ");
477         print_relname(s, relation);
478
479         /*
480          * Here the same tuple is used as old and new values, selectivity will be
481          * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
482          */
483         print_where_clause(s, relation, tuple, tuple);
484         appendStringInfoString(s, ";");
485 }
486
487
488 /*
489  * Decode an UPDATE entry
490  */
491 static void
492 decoder_raw_update(StringInfo s,
493                                    Relation relation,
494                                    HeapTuple oldtuple,
495                                    HeapTuple newtuple)
496 {
497         TupleDesc       tupdesc = RelationGetDescr(relation);
498         int                     natt;
499         bool            first_column = true;
500
501         /* If there are no new values, simply leave as there is nothing to do */
502         if (newtuple == NULL)
503                 return;
504
505         appendStringInfo(s, "UPDATE ");
506         print_relname(s, relation);
507
508         /* Build the SET clause with the new values */
509         appendStringInfo(s, " SET ");
510         for (natt = 0; natt < tupdesc->natts; natt++)
511         {
512                 Form_pg_attribute attr;
513                 Datum           origval;
514                 bool            isnull;
515                 Oid                     typoutput;
516                 bool            typisvarlena;
517
518                 attr = TupleDescAttr(tupdesc, natt);
519
520                 /* Skip dropped columns and system columns */
521                 if (attr->attisdropped || attr->attnum < 0)
522                         continue;
523
524                 /* Get Datum from tuple */
525                 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
526
527                 /* Get output type so we can know if it's varlena */
528                 getTypeOutputInfo(attr->atttypid,
529                                                   &typoutput, &typisvarlena);
530
531                 /*
532                  * TOASTed datum, but it is not changed so it can be skipped this in
533                  * the SET clause of this UPDATE query.
534                  */
535                 if (!isnull && typisvarlena &&
536                         VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
537                         continue;
538
539                 /* Skip comma for first colums */
540                 if (!first_column)
541                 {
542                         appendStringInfoString(s, ", ");
543                 }
544                 else
545                         first_column = false;
546
547                 /* Print attribute name */
548                 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
549
550                 /* Get output function */
551                 print_value(s, origval, attr->atttypid, isnull);
552         }
553
554         /* Print WHERE clause */
555         print_where_clause(s, relation, oldtuple, newtuple);
556
557         appendStringInfoString(s, ";");
558 }
559
560 /*
561  * Callback for individual changed tuples
562  */
563 static void
564 decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
565                                    Relation relation, ReorderBufferChange *change)
566 {
567         DecoderRawData *data;
568         MemoryContext old;
569         char            replident = relation->rd_rel->relreplident;
570         bool            is_rel_non_selective;
571
572         data = ctx->output_plugin_private;
573
574         /* Avoid leaking memory by using and resetting our own context */
575         old = MemoryContextSwitchTo(data->context);
576
577         /*
578          * Determine if relation is selective enough for WHERE clause generation
579          * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
580          * IDENTITY set as NOTHING, or DEFAULT without an available replica
581          * identity index.
582          */
583         RelationGetIndexList(relation);
584         is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
585                                                         (replident == REPLICA_IDENTITY_DEFAULT &&
586                                                          !OidIsValid(relation->rd_replidindex)));
587
588         /* Decode entry depending on its type */
589         switch (change->action)
590         {
591                 case REORDER_BUFFER_CHANGE_INSERT:
592                         if (change->data.tp.newtuple != NULL)
593                         {
594                                 OutputPluginPrepareWrite(ctx, true);
595                                 decoder_raw_insert(ctx->out,
596                                                                    relation,
597                                                                    change->data.tp.newtuple);
598                                 OutputPluginWrite(ctx, true);
599                         }
600                         break;
601                 case REORDER_BUFFER_CHANGE_UPDATE:
602                         if (!is_rel_non_selective)
603                         {
604                                 HeapTuple       oldtuple = change->data.tp.oldtuple;
605                                 HeapTuple       newtuple = change->data.tp.newtuple;
606
607                                 OutputPluginPrepareWrite(ctx, true);
608                                 decoder_raw_update(ctx->out,
609                                                                    relation,
610                                                                    oldtuple,
611                                                                    newtuple);
612                                 OutputPluginWrite(ctx, true);
613                         }
614                         break;
615                 case REORDER_BUFFER_CHANGE_DELETE:
616                         if (!is_rel_non_selective)
617                         {
618                                 OutputPluginPrepareWrite(ctx, true);
619                                 decoder_raw_delete(ctx->out,
620                                                                    relation,
621                                                                    change->data.tp.oldtuple);
622                                 OutputPluginWrite(ctx, true);
623                         }
624                         break;
625                 default:
626                         /* Should not come here */
627                         Assert(0);
628                         break;
629         }
630
631         MemoryContextSwitchTo(old);
632         MemoryContextReset(data->context);
633 }