1 /*-------------------------------------------------------------------------
4 * Logical decoding output plugin generating SQL queries based
7 * Forked from decoder_raw by Michael Paquier
8 * https://github.com/michaelpq/pg_plugins/tree/main/decoder_raw
10 * Copyright (c) 1996-2025, PostgreSQL Global Development Group
15 *-------------------------------------------------------------------------
20 #include "access/genam.h"
21 #include "access/sysattr.h"
22 #include "catalog/pg_class.h"
23 #include "catalog/pg_type.h"
24 #include "nodes/parsenodes.h"
25 #include "replication/output_plugin.h"
26 #include "replication/logical.h"
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
38 /* These must be available to pg_dlsym() */
39 extern void _PG_init(void);
40 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
43 * Structure storing the plugin specifications and options.
47 MemoryContext context;
48 bool include_transaction;
51 static void wal2sql_startup(LogicalDecodingContext *ctx,
52 OutputPluginOptions *opt,
54 static void wal2sql_shutdown(LogicalDecodingContext *ctx);
55 static void wal2sql_begin_txn(LogicalDecodingContext *ctx,
56 ReorderBufferTXN *txn);
57 static void wal2sql_commit_txn(LogicalDecodingContext *ctx,
58 ReorderBufferTXN *txn,
59 XLogRecPtr commit_lsn);
60 static void wal2sql_change(LogicalDecodingContext *ctx,
61 ReorderBufferTXN *txn, Relation rel,
62 ReorderBufferChange *change);
63 static void wal2sql_message(LogicalDecodingContext *ctx,
64 ReorderBufferTXN *txn,
65 XLogRecPtr message_lsn,
74 /* other plugins can perform things here */
77 /* specify output plugin callbacks */
79 _PG_output_plugin_init(OutputPluginCallbacks *cb)
81 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
83 cb->startup_cb = wal2sql_startup;
84 cb->begin_cb = wal2sql_begin_txn;
85 cb->change_cb = wal2sql_change;
86 cb->commit_cb = wal2sql_commit_txn;
87 cb->message_cb = wal2sql_message;
88 cb->shutdown_cb = wal2sql_shutdown;
92 /* initialize this plugin */
94 wal2sql_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
100 data = palloc(sizeof(Wal2SqlData));
101 data->context = AllocSetContextCreate(ctx->context,
103 ALLOCSET_DEFAULT_SIZES);
104 data->include_transaction = false;
106 ctx->output_plugin_private = data;
108 /* Default output format */
109 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
111 foreach(option, ctx->output_plugin_options)
113 DefElem *elem = lfirst(option);
115 Assert(elem->arg == NULL || IsA(elem->arg, String));
117 if (strcmp(elem->defname, "include_transaction") == 0)
119 /* if option does not provide a value, it means its value is true */
120 if (elem->arg == NULL)
121 data->include_transaction = true;
122 else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
124 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
125 errmsg("could not parse value \"%s\" for parameter \"%s\"",
126 strVal(elem->arg), elem->defname)));
128 else if (strcmp(elem->defname, "output_format") == 0)
132 if (elem->arg == NULL)
134 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
135 errmsg("No value specified for parameter \"%s\"",
138 format = strVal(elem->arg);
140 if (strcmp(format, "textual") == 0)
141 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
142 else if (strcmp(format, "binary") == 0)
143 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
146 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
147 errmsg("Incorrect value \"%s\" for parameter \"%s\"",
148 format, elem->defname)));
153 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154 errmsg("option \"%s\" = \"%s\" is unknown",
156 elem->arg ? strVal(elem->arg) : "(null)")));
161 /* cleanup this plugin's resources */
163 wal2sql_shutdown(LogicalDecodingContext *ctx)
165 Wal2SqlData *data = ctx->output_plugin_private;
167 /* cleanup our own resources via memory context reset */
168 MemoryContextDelete(data->context);
173 wal2sql_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
175 Wal2SqlData *data = ctx->output_plugin_private;
177 /* Write to the plugin only if there is */
178 if (data->include_transaction)
180 OutputPluginPrepareWrite(ctx, true);
181 appendStringInfoString(ctx->out, "BEGIN;");
182 OutputPluginWrite(ctx, true);
186 /* COMMIT callback */
188 wal2sql_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
189 XLogRecPtr commit_lsn)
191 Wal2SqlData *data = ctx->output_plugin_private;
193 /* Write to the plugin only if there is */
194 if (data->include_transaction)
196 OutputPluginPrepareWrite(ctx, true);
197 appendStringInfoString(ctx->out, "COMMIT;");
198 OutputPluginWrite(ctx, true);
203 * Print literal `outputstr' already represented as string of type `typid'
204 * into stringbuf `s'.
206 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
207 * if standard_conforming_strings were enabled.
210 print_literal(StringInfo s, Oid typid, char *outputstr)
217 if (outputstr[0] == 't')
218 appendStringInfoString(s, "true");
220 appendStringInfoString(s, "false");
227 /* NB: We don't care about Inf, NaN et al. */
228 appendStringInfoString(s, outputstr);
235 * Numeric can have NaN. Float can have Nan, Infinity and
236 * -Infinity. These need to be quoted.
238 if (strcmp(outputstr, "NaN") == 0 ||
239 strcmp(outputstr, "Infinity") == 0 ||
240 strcmp(outputstr, "-Infinity") == 0)
241 appendStringInfo(s, "'%s'", outputstr);
243 appendStringInfoString(s, outputstr);
247 appendStringInfo(s, "B'%s'", outputstr);
251 appendStringInfoChar(s, '\'');
252 for (valptr = outputstr; *valptr; valptr++)
256 if (SQL_STR_DOUBLE(ch, false))
257 appendStringInfoChar(s, ch);
258 appendStringInfoChar(s, ch);
260 appendStringInfoChar(s, '\'');
266 * Print a relation name into the StringInfo provided by caller.
269 print_relname(StringInfo s, Relation rel)
271 Form_pg_class class_form = RelationGetForm(rel);
273 appendStringInfoString(s,
274 quote_qualified_identifier(
276 get_rel_namespace(RelationGetRelid(rel))),
277 NameStr(class_form->relname)));
281 * Print a value into the StringInfo provided by caller.
284 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
289 /* Query output function */
290 getTypeOutputInfo(typid,
291 &typoutput, &typisvarlena);
295 appendStringInfoString(s, "null");
296 else if (typisvarlena &&
297 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
300 * This should not happen, the column and its value can be skipped
301 * properly. This code is let on purpose to avoid any traps in this
302 * area in the future, generating useless queries in non-assert
306 appendStringInfoString(s, "unchanged-toast-datum");
308 else if (!typisvarlena)
309 print_literal(s, typid,
310 OidOutputFunctionCall(typoutput, origval));
313 /* Definitely detoasted Datum */
316 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
317 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
322 * Print a WHERE clause item
325 print_where_clause_item(StringInfo s,
331 Form_pg_attribute attr;
334 TupleDesc tupdesc = RelationGetDescr(relation);
336 attr = TupleDescAttr(tupdesc, natt - 1);
338 /* Skip dropped columns and system columns */
339 if (attr->attisdropped || attr->attnum < 0)
342 /* Skip comma for first colums */
344 appendStringInfoString(s, " AND ");
346 *first_column = false;
348 /* Print attribute name */
349 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
351 /* Get Datum from tuple */
352 origval = heap_getattr(tuple, natt, tupdesc, &isnull);
354 /* Get output function */
355 print_value(s, origval, attr->atttypid, isnull);
359 * Generate a WHERE clause for UPDATE or DELETE.
362 print_where_clause(StringInfo s,
367 TupleDesc tupdesc = RelationGetDescr(relation);
369 bool first_column = true;
371 Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
372 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
373 relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
375 /* Build the WHERE clause */
376 appendStringInfoString(s, " WHERE ");
378 RelationGetIndexList(relation);
379 /* Generate WHERE clause using new values of REPLICA IDENTITY */
380 if (OidIsValid(relation->rd_replidindex))
385 /* Use all the values associated with the index */
386 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
387 for (key = 0; key < indexRel->rd_index->indnatts; key++)
389 int relattr = indexRel->rd_index->indkey.values[key];
392 * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
393 * if one of the columns used for tuple selectivity is changed,
394 * the old tuple data is not NULL and need to be used for tuple
395 * selectivity. If no such columns are updated, old tuple data is
398 print_where_clause_item(s, relation,
399 oldtuple ? oldtuple : newtuple,
400 relattr, &first_column);
402 index_close(indexRel, NoLock);
406 /* We need absolutely some values for tuple selectivity now */
407 Assert(oldtuple != NULL &&
408 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
411 * Fallback to default case, use of old values and print WHERE clause
412 * using all the columns. This is actually the code path for FULL.
414 for (natt = 0; natt < tupdesc->natts; natt++)
415 print_where_clause_item(s, relation, oldtuple,
416 natt + 1, &first_column);
420 * Decode an INSERT entry
423 wal2sql_insert(StringInfo s,
427 TupleDesc tupdesc = RelationGetDescr(relation);
429 bool first_column = true;
430 StringInfo values = makeStringInfo();
432 /* Initialize string info for values */
433 initStringInfo(values);
436 appendStringInfo(s, "INSERT INTO ");
437 print_relname(s, relation);
438 appendStringInfo(s, " (");
440 /* Build column names and values */
441 for (natt = 0; natt < tupdesc->natts; natt++)
443 Form_pg_attribute attr;
447 attr = TupleDescAttr(tupdesc, natt);
449 /* Skip dropped columns and system columns */
450 if (attr->attisdropped || attr->attnum < 0)
453 /* Skip comma for first colums */
456 appendStringInfoString(s, ", ");
457 appendStringInfoString(values, ", ");
460 first_column = false;
462 /* Print attribute name */
463 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
465 /* Get Datum from tuple */
466 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
468 /* Get output function */
469 print_value(values, origval, attr->atttypid, isnull);
473 appendStringInfo(s, ") VALUES (%s);", values->data);
476 resetStringInfo(values);
480 * Decode a DELETE entry
483 wal2sql_delete(StringInfo s,
487 appendStringInfo(s, "DELETE FROM ");
488 print_relname(s, relation);
491 * Here the same tuple is used as old and new values, selectivity will be
492 * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
494 print_where_clause(s, relation, tuple, tuple);
495 appendStringInfoString(s, ";");
500 * Decode an UPDATE entry
503 wal2sql_update(StringInfo s,
508 TupleDesc tupdesc = RelationGetDescr(relation);
510 bool first_column = true;
512 /* If there are no new values, simply leave as there is nothing to do */
513 if (newtuple == NULL)
516 appendStringInfo(s, "UPDATE ");
517 print_relname(s, relation);
519 /* Build the SET clause with the new values */
520 appendStringInfo(s, " SET ");
521 for (natt = 0; natt < tupdesc->natts; natt++)
523 Form_pg_attribute attr;
529 attr = TupleDescAttr(tupdesc, natt);
531 /* Skip dropped columns and system columns */
532 if (attr->attisdropped || attr->attnum < 0)
535 /* Get Datum from tuple */
536 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
538 /* Get output type so we can know if it's varlena */
539 getTypeOutputInfo(attr->atttypid,
540 &typoutput, &typisvarlena);
543 * TOASTed datum, but it is not changed so it can be skipped this in
544 * the SET clause of this UPDATE query.
546 if (!isnull && typisvarlena &&
547 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
550 /* Skip comma for first colums */
553 appendStringInfoString(s, ", ");
556 first_column = false;
558 /* Print attribute name */
559 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
561 /* Get output function */
562 print_value(s, origval, attr->atttypid, isnull);
565 /* Print WHERE clause */
566 print_where_clause(s, relation, oldtuple, newtuple);
568 appendStringInfoString(s, ";");
572 * Callback for individual changed tuples
575 wal2sql_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
576 Relation relation, ReorderBufferChange *change)
580 char replident = relation->rd_rel->relreplident;
581 bool is_rel_non_selective;
583 data = ctx->output_plugin_private;
585 /* Avoid leaking memory by using and resetting our own context */
586 old = MemoryContextSwitchTo(data->context);
589 * Determine if relation is selective enough for WHERE clause generation
590 * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
591 * IDENTITY set as NOTHING, or DEFAULT without an available replica
594 RelationGetIndexList(relation);
595 is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
596 (replident == REPLICA_IDENTITY_DEFAULT &&
597 !OidIsValid(relation->rd_replidindex)));
599 /* Decode entry depending on its type */
600 switch (change->action)
602 case REORDER_BUFFER_CHANGE_INSERT:
603 if (change->data.tp.newtuple != NULL)
605 OutputPluginPrepareWrite(ctx, true);
606 wal2sql_insert(ctx->out,
608 change->data.tp.newtuple);
609 OutputPluginWrite(ctx, true);
612 case REORDER_BUFFER_CHANGE_UPDATE:
613 if (!is_rel_non_selective)
615 HeapTuple oldtuple = change->data.tp.oldtuple;
616 HeapTuple newtuple = change->data.tp.newtuple;
618 OutputPluginPrepareWrite(ctx, true);
619 wal2sql_update(ctx->out,
623 OutputPluginWrite(ctx, true);
626 case REORDER_BUFFER_CHANGE_DELETE:
627 if (!is_rel_non_selective)
629 OutputPluginPrepareWrite(ctx, true);
630 wal2sql_delete(ctx->out,
632 change->data.tp.oldtuple);
633 OutputPluginWrite(ctx, true);
637 /* Should not come here */
642 MemoryContextSwitchTo(old);
643 MemoryContextReset(data->context);
647 * Callback for generic logical decoding messages.
649 * This callback handles messages emitted via pg_logical_emit_message().
650 * We use this to capture DDL commands that are emitted by event triggers.
651 * The event triggers emit messages with prefix 'ddl' containing the DDL
655 wal2sql_message(LogicalDecodingContext *ctx,
656 ReorderBufferTXN *txn,
657 XLogRecPtr message_lsn,
664 * Only process messages with the 'ddl' prefix. These are DDL statements
665 * emitted by our event triggers using pg_logical_emit_message().
667 if (strcmp(prefix, "ddl") != 0)
671 * Output the DDL statement. The message already contains the complete
672 * DDL command in executable SQL format (e.g., "ALTER TABLE foo ADD COLUMN bar TEXT;")
673 * so we output it as-is.
675 * Use appendBinaryStringInfo to properly handle the message with its size,
676 * as the message may not be null-terminated.
678 OutputPluginPrepareWrite(ctx, true);
679 appendBinaryStringInfo(ctx->out, message, message_size);
680 OutputPluginWrite(ctx, true);