1 /*-------------------------------------------------------------------------
4 * Logical decoding output plugin generating SQL queries based
7 * Forked from decoder_raw by Michael Paquier
8 * https://github.com/michaelpq/pg_plugins/tree/main/decoder_raw
10 * Copyright (c) 1996-2025, PostgreSQL Global Development Group
15 *-------------------------------------------------------------------------
20 #include "access/genam.h"
21 #include "access/sysattr.h"
22 #include "catalog/pg_class.h"
23 #include "catalog/pg_type.h"
24 #include "nodes/parsenodes.h"
25 #include "replication/output_plugin.h"
26 #include "replication/logical.h"
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
38 /* These must be available to pg_dlsym() */
39 extern void _PG_init(void);
40 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
43 * Structure storing the plugin specifications and options.
47 MemoryContext context;
48 bool include_transaction;
51 static void wal2sql_startup(LogicalDecodingContext *ctx,
52 OutputPluginOptions *opt,
54 static void wal2sql_shutdown(LogicalDecodingContext *ctx);
55 static void wal2sql_begin_txn(LogicalDecodingContext *ctx,
56 ReorderBufferTXN *txn);
57 static void wal2sql_commit_txn(LogicalDecodingContext *ctx,
58 ReorderBufferTXN *txn,
59 XLogRecPtr commit_lsn);
60 static void wal2sql_change(LogicalDecodingContext *ctx,
61 ReorderBufferTXN *txn, Relation rel,
62 ReorderBufferChange *change);
63 static void wal2sql_message(LogicalDecodingContext *ctx,
64 ReorderBufferTXN *txn,
65 XLogRecPtr message_lsn,
70 static void wal2sql_truncate(LogicalDecodingContext *ctx,
71 ReorderBufferTXN *txn,
74 ReorderBufferChange *change);
79 /* other plugins can perform things here */
82 /* specify output plugin callbacks */
84 _PG_output_plugin_init(OutputPluginCallbacks *cb)
86 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
88 cb->startup_cb = wal2sql_startup;
89 cb->begin_cb = wal2sql_begin_txn;
90 cb->change_cb = wal2sql_change;
91 cb->commit_cb = wal2sql_commit_txn;
92 cb->message_cb = wal2sql_message;
93 cb->truncate_cb = wal2sql_truncate;
94 cb->shutdown_cb = wal2sql_shutdown;
98 /* initialize this plugin */
100 wal2sql_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
106 data = palloc(sizeof(Wal2SqlData));
107 data->context = AllocSetContextCreate(ctx->context,
109 ALLOCSET_DEFAULT_SIZES);
110 data->include_transaction = false;
112 ctx->output_plugin_private = data;
114 /* Default output format */
115 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
117 foreach(option, ctx->output_plugin_options)
119 DefElem *elem = lfirst(option);
121 Assert(elem->arg == NULL || IsA(elem->arg, String));
123 if (strcmp(elem->defname, "include_transaction") == 0)
125 /* if option does not provide a value, it means its value is true */
126 if (elem->arg == NULL)
127 data->include_transaction = true;
128 else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
130 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
131 errmsg("could not parse value \"%s\" for parameter \"%s\"",
132 strVal(elem->arg), elem->defname)));
134 else if (strcmp(elem->defname, "output_format") == 0)
138 if (elem->arg == NULL)
140 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
141 errmsg("No value specified for parameter \"%s\"",
144 format = strVal(elem->arg);
146 if (strcmp(format, "textual") == 0)
147 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
148 else if (strcmp(format, "binary") == 0)
149 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
152 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
153 errmsg("Incorrect value \"%s\" for parameter \"%s\"",
154 format, elem->defname)));
159 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
160 errmsg("option \"%s\" = \"%s\" is unknown",
162 elem->arg ? strVal(elem->arg) : "(null)")));
167 /* cleanup this plugin's resources */
169 wal2sql_shutdown(LogicalDecodingContext *ctx)
171 Wal2SqlData *data = ctx->output_plugin_private;
173 /* cleanup our own resources via memory context reset */
174 MemoryContextDelete(data->context);
179 wal2sql_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
181 Wal2SqlData *data = ctx->output_plugin_private;
183 /* Write to the plugin only if there is */
184 if (data->include_transaction)
186 OutputPluginPrepareWrite(ctx, true);
187 appendStringInfoString(ctx->out, "BEGIN;");
188 OutputPluginWrite(ctx, true);
192 /* COMMIT callback */
194 wal2sql_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
195 XLogRecPtr commit_lsn)
197 Wal2SqlData *data = ctx->output_plugin_private;
199 /* Write to the plugin only if there is */
200 if (data->include_transaction)
202 OutputPluginPrepareWrite(ctx, true);
203 appendStringInfoString(ctx->out, "COMMIT;");
204 OutputPluginWrite(ctx, true);
209 * Print literal `outputstr' already represented as string of type `typid'
210 * into stringbuf `s'.
212 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
213 * if standard_conforming_strings were enabled.
216 print_literal(StringInfo s, Oid typid, char *outputstr)
223 if (outputstr[0] == 't')
224 appendStringInfoString(s, "true");
226 appendStringInfoString(s, "false");
233 /* NB: We don't care about Inf, NaN et al. */
234 appendStringInfoString(s, outputstr);
241 * Numeric can have NaN. Float can have Nan, Infinity and
242 * -Infinity. These need to be quoted.
244 if (strcmp(outputstr, "NaN") == 0 ||
245 strcmp(outputstr, "Infinity") == 0 ||
246 strcmp(outputstr, "-Infinity") == 0)
247 appendStringInfo(s, "'%s'", outputstr);
249 appendStringInfoString(s, outputstr);
253 appendStringInfo(s, "B'%s'", outputstr);
257 appendStringInfoChar(s, '\'');
258 for (valptr = outputstr; *valptr; valptr++)
262 if (SQL_STR_DOUBLE(ch, false))
263 appendStringInfoChar(s, ch);
264 appendStringInfoChar(s, ch);
266 appendStringInfoChar(s, '\'');
272 * Print a relation name into the StringInfo provided by caller.
275 print_relname(StringInfo s, Relation rel)
277 Form_pg_class class_form = RelationGetForm(rel);
279 appendStringInfoString(s,
280 quote_qualified_identifier(
282 get_rel_namespace(RelationGetRelid(rel))),
283 NameStr(class_form->relname)));
287 * Print a value into the StringInfo provided by caller.
290 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
295 /* Query output function */
296 getTypeOutputInfo(typid,
297 &typoutput, &typisvarlena);
301 appendStringInfoString(s, "null");
302 else if (typisvarlena &&
303 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
306 * This should not happen, the column and its value can be skipped
307 * properly. This code is let on purpose to avoid any traps in this
308 * area in the future, generating useless queries in non-assert
312 appendStringInfoString(s, "unchanged-toast-datum");
314 else if (!typisvarlena)
315 print_literal(s, typid,
316 OidOutputFunctionCall(typoutput, origval));
319 /* Definitely detoasted Datum */
322 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
323 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
328 * Print a WHERE clause item
331 print_where_clause_item(StringInfo s,
337 Form_pg_attribute attr;
340 TupleDesc tupdesc = RelationGetDescr(relation);
342 attr = TupleDescAttr(tupdesc, natt - 1);
344 /* Skip dropped columns and system columns */
345 if (attr->attisdropped || attr->attnum < 0)
348 /* Skip comma for first colums */
350 appendStringInfoString(s, " AND ");
352 *first_column = false;
354 /* Print attribute name */
355 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
357 /* Get Datum from tuple */
358 origval = heap_getattr(tuple, natt, tupdesc, &isnull);
360 /* Get output function */
361 print_value(s, origval, attr->atttypid, isnull);
365 * Generate a WHERE clause for UPDATE or DELETE.
368 print_where_clause(StringInfo s,
373 TupleDesc tupdesc = RelationGetDescr(relation);
375 bool first_column = true;
377 Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
378 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
379 relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
381 /* Build the WHERE clause */
382 appendStringInfoString(s, " WHERE ");
384 RelationGetIndexList(relation);
385 /* Generate WHERE clause using new values of REPLICA IDENTITY */
386 if (OidIsValid(relation->rd_replidindex))
391 /* Use all the values associated with the index */
392 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
393 for (key = 0; key < indexRel->rd_index->indnatts; key++)
395 int relattr = indexRel->rd_index->indkey.values[key];
398 * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
399 * if one of the columns used for tuple selectivity is changed,
400 * the old tuple data is not NULL and need to be used for tuple
401 * selectivity. If no such columns are updated, old tuple data is
404 print_where_clause_item(s, relation,
405 oldtuple ? oldtuple : newtuple,
406 relattr, &first_column);
408 index_close(indexRel, NoLock);
412 /* We need absolutely some values for tuple selectivity now */
413 Assert(oldtuple != NULL &&
414 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
417 * Fallback to default case, use of old values and print WHERE clause
418 * using all the columns. This is actually the code path for FULL.
420 for (natt = 0; natt < tupdesc->natts; natt++)
421 print_where_clause_item(s, relation, oldtuple,
422 natt + 1, &first_column);
426 * Decode an INSERT entry
429 wal2sql_insert(StringInfo s,
433 TupleDesc tupdesc = RelationGetDescr(relation);
435 bool first_column = true;
436 StringInfo values = makeStringInfo();
438 /* Initialize string info for values */
439 initStringInfo(values);
442 appendStringInfo(s, "INSERT INTO ");
443 print_relname(s, relation);
444 appendStringInfo(s, " (");
446 /* Build column names and values */
447 for (natt = 0; natt < tupdesc->natts; natt++)
449 Form_pg_attribute attr;
453 attr = TupleDescAttr(tupdesc, natt);
455 /* Skip dropped columns and system columns */
456 if (attr->attisdropped || attr->attnum < 0)
459 /* Skip comma for first colums */
462 appendStringInfoString(s, ", ");
463 appendStringInfoString(values, ", ");
466 first_column = false;
468 /* Print attribute name */
469 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
471 /* Get Datum from tuple */
472 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
474 /* Get output function */
475 print_value(values, origval, attr->atttypid, isnull);
479 appendStringInfo(s, ") VALUES (%s);", values->data);
482 resetStringInfo(values);
486 * Decode a DELETE entry
489 wal2sql_delete(StringInfo s,
493 appendStringInfo(s, "DELETE FROM ");
494 print_relname(s, relation);
497 * Here the same tuple is used as old and new values, selectivity will be
498 * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
500 print_where_clause(s, relation, tuple, tuple);
501 appendStringInfoString(s, ";");
506 * Decode an UPDATE entry
509 wal2sql_update(StringInfo s,
514 TupleDesc tupdesc = RelationGetDescr(relation);
516 bool first_column = true;
518 /* If there are no new values, simply leave as there is nothing to do */
519 if (newtuple == NULL)
522 appendStringInfo(s, "UPDATE ");
523 print_relname(s, relation);
525 /* Build the SET clause with the new values */
526 appendStringInfo(s, " SET ");
527 for (natt = 0; natt < tupdesc->natts; natt++)
529 Form_pg_attribute attr;
535 attr = TupleDescAttr(tupdesc, natt);
537 /* Skip dropped columns and system columns */
538 if (attr->attisdropped || attr->attnum < 0)
541 /* Get Datum from tuple */
542 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
544 /* Get output type so we can know if it's varlena */
545 getTypeOutputInfo(attr->atttypid,
546 &typoutput, &typisvarlena);
549 * TOASTed datum, but it is not changed so it can be skipped this in
550 * the SET clause of this UPDATE query.
552 if (!isnull && typisvarlena &&
553 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
556 /* Skip comma for first colums */
559 appendStringInfoString(s, ", ");
562 first_column = false;
564 /* Print attribute name */
565 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
567 /* Get output function */
568 print_value(s, origval, attr->atttypid, isnull);
571 /* Print WHERE clause */
572 print_where_clause(s, relation, oldtuple, newtuple);
574 appendStringInfoString(s, ";");
578 * Callback for individual changed tuples
581 wal2sql_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
582 Relation relation, ReorderBufferChange *change)
586 char replident = relation->rd_rel->relreplident;
587 bool is_rel_non_selective;
589 data = ctx->output_plugin_private;
591 /* Avoid leaking memory by using and resetting our own context */
592 old = MemoryContextSwitchTo(data->context);
595 * Determine if relation is selective enough for WHERE clause generation
596 * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
597 * IDENTITY set as NOTHING, or DEFAULT without an available replica
600 RelationGetIndexList(relation);
601 is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
602 (replident == REPLICA_IDENTITY_DEFAULT &&
603 !OidIsValid(relation->rd_replidindex)));
605 /* Decode entry depending on its type */
606 switch (change->action)
608 case REORDER_BUFFER_CHANGE_INSERT:
609 if (change->data.tp.newtuple != NULL)
611 OutputPluginPrepareWrite(ctx, true);
612 wal2sql_insert(ctx->out,
614 change->data.tp.newtuple);
615 OutputPluginWrite(ctx, true);
618 case REORDER_BUFFER_CHANGE_UPDATE:
619 if (!is_rel_non_selective)
621 HeapTuple oldtuple = change->data.tp.oldtuple;
622 HeapTuple newtuple = change->data.tp.newtuple;
624 OutputPluginPrepareWrite(ctx, true);
625 wal2sql_update(ctx->out,
629 OutputPluginWrite(ctx, true);
632 case REORDER_BUFFER_CHANGE_DELETE:
633 if (!is_rel_non_selective)
635 OutputPluginPrepareWrite(ctx, true);
636 wal2sql_delete(ctx->out,
638 change->data.tp.oldtuple);
639 OutputPluginWrite(ctx, true);
643 /* Should not come here */
648 MemoryContextSwitchTo(old);
649 MemoryContextReset(data->context);
653 * Callback for generic logical decoding messages.
655 * This callback handles messages emitted via pg_logical_emit_message().
656 * We use this to capture DDL commands that are emitted by event triggers.
657 * The event triggers emit messages with prefix 'ddl' containing the DDL
661 wal2sql_message(LogicalDecodingContext *ctx,
662 ReorderBufferTXN *txn,
663 XLogRecPtr message_lsn,
670 * Only process messages with the 'ddl' prefix. These are DDL statements
671 * emitted by our event triggers using pg_logical_emit_message().
673 if (strcmp(prefix, "ddl") != 0)
677 * Output the DDL statement. The message already contains the complete
678 * DDL command in executable SQL format (e.g., "ALTER TABLE foo ADD COLUMN bar TEXT;")
679 * so we output it as-is.
681 * Use appendBinaryStringInfo to properly handle the message with its size,
682 * as the message may not be null-terminated.
684 OutputPluginPrepareWrite(ctx, true);
685 appendBinaryStringInfo(ctx->out, message, message_size);
686 OutputPluginWrite(ctx, true);
690 * Callback for TRUNCATE operations.
692 * TRUNCATE is a DML operation and does NOT trigger DDL event triggers,
693 * so it must be captured via logical replication using this callback.
694 * The callback receives an array of relations since TRUNCATE can affect
695 * multiple tables (e.g., via foreign key cascades or explicit multi-table syntax).
698 wal2sql_truncate(LogicalDecodingContext *ctx,
699 ReorderBufferTXN *txn,
701 Relation relations[],
702 ReorderBufferChange *change)
708 data = ctx->output_plugin_private;
710 /* Avoid leaking memory by using and resetting our own context */
711 old = MemoryContextSwitchTo(data->context);
713 OutputPluginPrepareWrite(ctx, true);
715 /* Output TRUNCATE statement */
716 appendStringInfoString(ctx->out, "TRUNCATE TABLE ");
718 /* Add all table names, separated by commas */
719 for (i = 0; i < nrelations; i++)
722 appendStringInfoString(ctx->out, ", ");
724 print_relname(ctx->out, relations[i]);
727 appendStringInfoString(ctx->out, ";");
729 OutputPluginWrite(ctx, true);
731 MemoryContextSwitchTo(old);
732 MemoryContextReset(data->context);