1 /*-------------------------------------------------------------------------
4 * Logical decoding output plugin generating SQL queries based
7 * Forked from decoder_raw by Michael Paquier
8 * https://github.com/michaelpq/pg_plugins/tree/main/decoder_raw
10 * Copyright (c) 1996-2025, PostgreSQL Global Development Group
15 *-------------------------------------------------------------------------
20 #include "access/genam.h"
21 #include "access/sysattr.h"
22 #include "catalog/pg_class.h"
23 #include "catalog/pg_type.h"
24 #include "nodes/parsenodes.h"
25 #include "replication/output_plugin.h"
26 #include "replication/logical.h"
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
38 /* These must be available to pg_dlsym() */
39 extern void _PG_init(void);
40 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
43 * Structure storing the plugin specifications and options.
47 MemoryContext context;
48 bool include_transaction;
51 static void wal2sql_startup(LogicalDecodingContext *ctx,
52 OutputPluginOptions *opt,
54 static void wal2sql_shutdown(LogicalDecodingContext *ctx);
55 static void wal2sql_begin_txn(LogicalDecodingContext *ctx,
56 ReorderBufferTXN *txn);
57 static void wal2sql_commit_txn(LogicalDecodingContext *ctx,
58 ReorderBufferTXN *txn,
59 XLogRecPtr commit_lsn);
60 static void wal2sql_change(LogicalDecodingContext *ctx,
61 ReorderBufferTXN *txn, Relation rel,
62 ReorderBufferChange *change);
67 /* other plugins can perform things here */
70 /* specify output plugin callbacks */
72 _PG_output_plugin_init(OutputPluginCallbacks *cb)
74 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
76 cb->startup_cb = wal2sql_startup;
77 cb->begin_cb = wal2sql_begin_txn;
78 cb->change_cb = wal2sql_change;
79 cb->commit_cb = wal2sql_commit_txn;
80 cb->shutdown_cb = wal2sql_shutdown;
84 /* initialize this plugin */
86 wal2sql_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
92 data = palloc(sizeof(Wal2SqlData));
93 data->context = AllocSetContextCreate(ctx->context,
95 ALLOCSET_DEFAULT_SIZES);
96 data->include_transaction = false;
98 ctx->output_plugin_private = data;
100 /* Default output format */
101 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
103 foreach(option, ctx->output_plugin_options)
105 DefElem *elem = lfirst(option);
107 Assert(elem->arg == NULL || IsA(elem->arg, String));
109 if (strcmp(elem->defname, "include_transaction") == 0)
111 /* if option does not provide a value, it means its value is true */
112 if (elem->arg == NULL)
113 data->include_transaction = true;
114 else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
116 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 errmsg("could not parse value \"%s\" for parameter \"%s\"",
118 strVal(elem->arg), elem->defname)));
120 else if (strcmp(elem->defname, "output_format") == 0)
124 if (elem->arg == NULL)
126 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127 errmsg("No value specified for parameter \"%s\"",
130 format = strVal(elem->arg);
132 if (strcmp(format, "textual") == 0)
133 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
134 else if (strcmp(format, "binary") == 0)
135 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
138 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
139 errmsg("Incorrect value \"%s\" for parameter \"%s\"",
140 format, elem->defname)));
145 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
146 errmsg("option \"%s\" = \"%s\" is unknown",
148 elem->arg ? strVal(elem->arg) : "(null)")));
153 /* cleanup this plugin's resources */
155 wal2sql_shutdown(LogicalDecodingContext *ctx)
157 Wal2SqlData *data = ctx->output_plugin_private;
159 /* cleanup our own resources via memory context reset */
160 MemoryContextDelete(data->context);
165 wal2sql_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
167 Wal2SqlData *data = ctx->output_plugin_private;
169 /* Write to the plugin only if there is */
170 if (data->include_transaction)
172 OutputPluginPrepareWrite(ctx, true);
173 appendStringInfoString(ctx->out, "BEGIN;");
174 OutputPluginWrite(ctx, true);
178 /* COMMIT callback */
180 wal2sql_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
181 XLogRecPtr commit_lsn)
183 Wal2SqlData *data = ctx->output_plugin_private;
185 /* Write to the plugin only if there is */
186 if (data->include_transaction)
188 OutputPluginPrepareWrite(ctx, true);
189 appendStringInfoString(ctx->out, "COMMIT;");
190 OutputPluginWrite(ctx, true);
195 * Print literal `outputstr' already represented as string of type `typid'
196 * into stringbuf `s'.
198 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
199 * if standard_conforming_strings were enabled.
202 print_literal(StringInfo s, Oid typid, char *outputstr)
209 if (outputstr[0] == 't')
210 appendStringInfoString(s, "true");
212 appendStringInfoString(s, "false");
219 /* NB: We don't care about Inf, NaN et al. */
220 appendStringInfoString(s, outputstr);
227 * Numeric can have NaN. Float can have Nan, Infinity and
228 * -Infinity. These need to be quoted.
230 if (strcmp(outputstr, "NaN") == 0 ||
231 strcmp(outputstr, "Infinity") == 0 ||
232 strcmp(outputstr, "-Infinity") == 0)
233 appendStringInfo(s, "'%s'", outputstr);
235 appendStringInfoString(s, outputstr);
239 appendStringInfo(s, "B'%s'", outputstr);
243 appendStringInfoChar(s, '\'');
244 for (valptr = outputstr; *valptr; valptr++)
248 if (SQL_STR_DOUBLE(ch, false))
249 appendStringInfoChar(s, ch);
250 appendStringInfoChar(s, ch);
252 appendStringInfoChar(s, '\'');
258 * Print a relation name into the StringInfo provided by caller.
261 print_relname(StringInfo s, Relation rel)
263 Form_pg_class class_form = RelationGetForm(rel);
265 appendStringInfoString(s,
266 quote_qualified_identifier(
268 get_rel_namespace(RelationGetRelid(rel))),
269 NameStr(class_form->relname)));
273 * Print a value into the StringInfo provided by caller.
276 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
281 /* Query output function */
282 getTypeOutputInfo(typid,
283 &typoutput, &typisvarlena);
287 appendStringInfoString(s, "null");
288 else if (typisvarlena &&
289 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
292 * This should not happen, the column and its value can be skipped
293 * properly. This code is let on purpose to avoid any traps in this
294 * area in the future, generating useless queries in non-assert
298 appendStringInfoString(s, "unchanged-toast-datum");
300 else if (!typisvarlena)
301 print_literal(s, typid,
302 OidOutputFunctionCall(typoutput, origval));
305 /* Definitely detoasted Datum */
308 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
309 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
314 * Print a WHERE clause item
317 print_where_clause_item(StringInfo s,
323 Form_pg_attribute attr;
326 TupleDesc tupdesc = RelationGetDescr(relation);
328 attr = TupleDescAttr(tupdesc, natt - 1);
330 /* Skip dropped columns and system columns */
331 if (attr->attisdropped || attr->attnum < 0)
334 /* Skip comma for first colums */
336 appendStringInfoString(s, " AND ");
338 *first_column = false;
340 /* Print attribute name */
341 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
343 /* Get Datum from tuple */
344 origval = heap_getattr(tuple, natt, tupdesc, &isnull);
346 /* Get output function */
347 print_value(s, origval, attr->atttypid, isnull);
351 * Generate a WHERE clause for UPDATE or DELETE.
354 print_where_clause(StringInfo s,
359 TupleDesc tupdesc = RelationGetDescr(relation);
361 bool first_column = true;
363 Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
364 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
365 relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
367 /* Build the WHERE clause */
368 appendStringInfoString(s, " WHERE ");
370 RelationGetIndexList(relation);
371 /* Generate WHERE clause using new values of REPLICA IDENTITY */
372 if (OidIsValid(relation->rd_replidindex))
377 /* Use all the values associated with the index */
378 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
379 for (key = 0; key < indexRel->rd_index->indnatts; key++)
381 int relattr = indexRel->rd_index->indkey.values[key];
384 * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
385 * if one of the columns used for tuple selectivity is changed,
386 * the old tuple data is not NULL and need to be used for tuple
387 * selectivity. If no such columns are updated, old tuple data is
390 print_where_clause_item(s, relation,
391 oldtuple ? oldtuple : newtuple,
392 relattr, &first_column);
394 index_close(indexRel, NoLock);
398 /* We need absolutely some values for tuple selectivity now */
399 Assert(oldtuple != NULL &&
400 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
403 * Fallback to default case, use of old values and print WHERE clause
404 * using all the columns. This is actually the code path for FULL.
406 for (natt = 0; natt < tupdesc->natts; natt++)
407 print_where_clause_item(s, relation, oldtuple,
408 natt + 1, &first_column);
412 * Decode an INSERT entry
415 wal2sql_insert(StringInfo s,
419 TupleDesc tupdesc = RelationGetDescr(relation);
421 bool first_column = true;
422 StringInfo values = makeStringInfo();
424 /* Initialize string info for values */
425 initStringInfo(values);
428 appendStringInfo(s, "INSERT INTO ");
429 print_relname(s, relation);
430 appendStringInfo(s, " (");
432 /* Build column names and values */
433 for (natt = 0; natt < tupdesc->natts; natt++)
435 Form_pg_attribute attr;
439 attr = TupleDescAttr(tupdesc, natt);
441 /* Skip dropped columns and system columns */
442 if (attr->attisdropped || attr->attnum < 0)
445 /* Skip comma for first colums */
448 appendStringInfoString(s, ", ");
449 appendStringInfoString(values, ", ");
452 first_column = false;
454 /* Print attribute name */
455 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
457 /* Get Datum from tuple */
458 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
460 /* Get output function */
461 print_value(values, origval, attr->atttypid, isnull);
465 appendStringInfo(s, ") VALUES (%s);", values->data);
468 resetStringInfo(values);
472 * Decode a DELETE entry
475 wal2sql_delete(StringInfo s,
479 appendStringInfo(s, "DELETE FROM ");
480 print_relname(s, relation);
483 * Here the same tuple is used as old and new values, selectivity will be
484 * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
486 print_where_clause(s, relation, tuple, tuple);
487 appendStringInfoString(s, ";");
492 * Decode an UPDATE entry
495 wal2sql_update(StringInfo s,
500 TupleDesc tupdesc = RelationGetDescr(relation);
502 bool first_column = true;
504 /* If there are no new values, simply leave as there is nothing to do */
505 if (newtuple == NULL)
508 appendStringInfo(s, "UPDATE ");
509 print_relname(s, relation);
511 /* Build the SET clause with the new values */
512 appendStringInfo(s, " SET ");
513 for (natt = 0; natt < tupdesc->natts; natt++)
515 Form_pg_attribute attr;
521 attr = TupleDescAttr(tupdesc, natt);
523 /* Skip dropped columns and system columns */
524 if (attr->attisdropped || attr->attnum < 0)
527 /* Get Datum from tuple */
528 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
530 /* Get output type so we can know if it's varlena */
531 getTypeOutputInfo(attr->atttypid,
532 &typoutput, &typisvarlena);
535 * TOASTed datum, but it is not changed so it can be skipped this in
536 * the SET clause of this UPDATE query.
538 if (!isnull && typisvarlena &&
539 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
542 /* Skip comma for first colums */
545 appendStringInfoString(s, ", ");
548 first_column = false;
550 /* Print attribute name */
551 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
553 /* Get output function */
554 print_value(s, origval, attr->atttypid, isnull);
557 /* Print WHERE clause */
558 print_where_clause(s, relation, oldtuple, newtuple);
560 appendStringInfoString(s, ";");
564 * Callback for individual changed tuples
567 wal2sql_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
568 Relation relation, ReorderBufferChange *change)
572 char replident = relation->rd_rel->relreplident;
573 bool is_rel_non_selective;
575 data = ctx->output_plugin_private;
577 /* Avoid leaking memory by using and resetting our own context */
578 old = MemoryContextSwitchTo(data->context);
581 * Determine if relation is selective enough for WHERE clause generation
582 * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
583 * IDENTITY set as NOTHING, or DEFAULT without an available replica
586 RelationGetIndexList(relation);
587 is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
588 (replident == REPLICA_IDENTITY_DEFAULT &&
589 !OidIsValid(relation->rd_replidindex)));
591 /* Decode entry depending on its type */
592 switch (change->action)
594 case REORDER_BUFFER_CHANGE_INSERT:
595 if (change->data.tp.newtuple != NULL)
597 OutputPluginPrepareWrite(ctx, true);
598 wal2sql_insert(ctx->out,
600 change->data.tp.newtuple);
601 OutputPluginWrite(ctx, true);
604 case REORDER_BUFFER_CHANGE_UPDATE:
605 if (!is_rel_non_selective)
607 HeapTuple oldtuple = change->data.tp.oldtuple;
608 HeapTuple newtuple = change->data.tp.newtuple;
610 OutputPluginPrepareWrite(ctx, true);
611 wal2sql_update(ctx->out,
615 OutputPluginWrite(ctx, true);
618 case REORDER_BUFFER_CHANGE_DELETE:
619 if (!is_rel_non_selective)
621 OutputPluginPrepareWrite(ctx, true);
622 wal2sql_delete(ctx->out,
624 change->data.tp.oldtuple);
625 OutputPluginWrite(ctx, true);
629 /* Should not come here */
634 MemoryContextSwitchTo(old);
635 MemoryContextReset(data->context);