1 /*-------------------------------------------------------------------------
4 * Logical decoding output plugin generating SQL queries based
7 * Copyright (c) 1996-2025, PostgreSQL Global Development Group
10 * decoder_raw/decoder_raw.c
12 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "catalog/pg_type.h"
21 #include "nodes/parsenodes.h"
22 #include "replication/output_plugin.h"
23 #include "replication/logical.h"
24 #include "utils/builtins.h"
25 #include "utils/lsyscache.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28 #include "utils/relcache.h"
29 #include "utils/syscache.h"
30 #include "utils/typcache.h"
35 /* These must be available to pg_dlsym() */
36 extern void _PG_init(void);
37 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
40 * Structure storing the plugin specifications and options.
44 MemoryContext context;
45 bool include_transaction;
48 static void decoder_raw_startup(LogicalDecodingContext *ctx,
49 OutputPluginOptions *opt,
51 static void decoder_raw_shutdown(LogicalDecodingContext *ctx);
52 static void decoder_raw_begin_txn(LogicalDecodingContext *ctx,
53 ReorderBufferTXN *txn);
54 static void decoder_raw_commit_txn(LogicalDecodingContext *ctx,
55 ReorderBufferTXN *txn,
56 XLogRecPtr commit_lsn);
57 static void decoder_raw_change(LogicalDecodingContext *ctx,
58 ReorderBufferTXN *txn, Relation rel,
59 ReorderBufferChange *change);
64 /* other plugins can perform things here */
67 /* specify output plugin callbacks */
69 _PG_output_plugin_init(OutputPluginCallbacks *cb)
71 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
73 cb->startup_cb = decoder_raw_startup;
74 cb->begin_cb = decoder_raw_begin_txn;
75 cb->change_cb = decoder_raw_change;
76 cb->commit_cb = decoder_raw_commit_txn;
77 cb->shutdown_cb = decoder_raw_shutdown;
81 /* initialize this plugin */
83 decoder_raw_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
89 data = palloc(sizeof(DecoderRawData));
90 data->context = AllocSetContextCreate(ctx->context,
91 "Raw decoder context",
92 ALLOCSET_DEFAULT_SIZES);
93 data->include_transaction = false;
95 ctx->output_plugin_private = data;
97 /* Default output format */
98 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
100 foreach(option, ctx->output_plugin_options)
102 DefElem *elem = lfirst(option);
104 Assert(elem->arg == NULL || IsA(elem->arg, String));
106 if (strcmp(elem->defname, "include_transaction") == 0)
108 /* if option does not provide a value, it means its value is true */
109 if (elem->arg == NULL)
110 data->include_transaction = true;
111 else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
113 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114 errmsg("could not parse value \"%s\" for parameter \"%s\"",
115 strVal(elem->arg), elem->defname)));
117 else if (strcmp(elem->defname, "output_format") == 0)
121 if (elem->arg == NULL)
123 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
124 errmsg("No value specified for parameter \"%s\"",
127 format = strVal(elem->arg);
129 if (strcmp(format, "textual") == 0)
130 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
131 else if (strcmp(format, "binary") == 0)
132 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
135 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
136 errmsg("Incorrect value \"%s\" for parameter \"%s\"",
137 format, elem->defname)));
142 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
143 errmsg("option \"%s\" = \"%s\" is unknown",
145 elem->arg ? strVal(elem->arg) : "(null)")));
150 /* cleanup this plugin's resources */
152 decoder_raw_shutdown(LogicalDecodingContext *ctx)
154 DecoderRawData *data = ctx->output_plugin_private;
156 /* cleanup our own resources via memory context reset */
157 MemoryContextDelete(data->context);
162 decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
164 DecoderRawData *data = ctx->output_plugin_private;
166 /* Write to the plugin only if there is */
167 if (data->include_transaction)
169 OutputPluginPrepareWrite(ctx, true);
170 appendStringInfoString(ctx->out, "BEGIN;");
171 OutputPluginWrite(ctx, true);
175 /* COMMIT callback */
177 decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
178 XLogRecPtr commit_lsn)
180 DecoderRawData *data = ctx->output_plugin_private;
182 /* Write to the plugin only if there is */
183 if (data->include_transaction)
185 OutputPluginPrepareWrite(ctx, true);
186 appendStringInfoString(ctx->out, "COMMIT;");
187 OutputPluginWrite(ctx, true);
192 * Print literal `outputstr' already represented as string of type `typid'
193 * into stringbuf `s'.
195 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
196 * if standard_conforming_strings were enabled.
199 print_literal(StringInfo s, Oid typid, char *outputstr)
206 if (outputstr[0] == 't')
207 appendStringInfoString(s, "true");
209 appendStringInfoString(s, "false");
216 /* NB: We don't care about Inf, NaN et al. */
217 appendStringInfoString(s, outputstr);
224 * Numeric can have NaN. Float can have Nan, Infinity and
225 * -Infinity. These need to be quoted.
227 if (strcmp(outputstr, "NaN") == 0 ||
228 strcmp(outputstr, "Infinity") == 0 ||
229 strcmp(outputstr, "-Infinity") == 0)
230 appendStringInfo(s, "'%s'", outputstr);
232 appendStringInfoString(s, outputstr);
236 appendStringInfo(s, "B'%s'", outputstr);
240 appendStringInfoChar(s, '\'');
241 for (valptr = outputstr; *valptr; valptr++)
245 if (SQL_STR_DOUBLE(ch, false))
246 appendStringInfoChar(s, ch);
247 appendStringInfoChar(s, ch);
249 appendStringInfoChar(s, '\'');
255 * Print a relation name into the StringInfo provided by caller.
258 print_relname(StringInfo s, Relation rel)
260 Form_pg_class class_form = RelationGetForm(rel);
262 appendStringInfoString(s,
263 quote_qualified_identifier(
265 get_rel_namespace(RelationGetRelid(rel))),
266 NameStr(class_form->relname)));
270 * Print a value into the StringInfo provided by caller.
273 print_value(StringInfo s, Datum origval, Oid typid, bool isnull)
278 /* Query output function */
279 getTypeOutputInfo(typid,
280 &typoutput, &typisvarlena);
284 appendStringInfoString(s, "null");
285 else if (typisvarlena &&
286 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
289 * This should not happen, the column and its value can be skipped
290 * properly. This code is let on purpose to avoid any traps in this
291 * area in the future, generating useless queries in non-assert
295 appendStringInfoString(s, "unchanged-toast-datum");
297 else if (!typisvarlena)
298 print_literal(s, typid,
299 OidOutputFunctionCall(typoutput, origval));
302 /* Definitely detoasted Datum */
305 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
306 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
311 * Print a WHERE clause item
314 print_where_clause_item(StringInfo s,
320 Form_pg_attribute attr;
323 TupleDesc tupdesc = RelationGetDescr(relation);
325 attr = TupleDescAttr(tupdesc, natt - 1);
327 /* Skip dropped columns and system columns */
328 if (attr->attisdropped || attr->attnum < 0)
331 /* Skip comma for first colums */
333 appendStringInfoString(s, " AND ");
335 *first_column = false;
337 /* Print attribute name */
338 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
340 /* Get Datum from tuple */
341 origval = heap_getattr(tuple, natt, tupdesc, &isnull);
343 /* Get output function */
344 print_value(s, origval, attr->atttypid, isnull);
348 * Generate a WHERE clause for UPDATE or DELETE.
351 print_where_clause(StringInfo s,
356 TupleDesc tupdesc = RelationGetDescr(relation);
358 bool first_column = true;
360 Assert(relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
361 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
362 relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
364 /* Build the WHERE clause */
365 appendStringInfoString(s, " WHERE ");
367 RelationGetIndexList(relation);
368 /* Generate WHERE clause using new values of REPLICA IDENTITY */
369 if (OidIsValid(relation->rd_replidindex))
374 /* Use all the values associated with the index */
375 indexRel = index_open(relation->rd_replidindex, AccessShareLock);
376 for (key = 0; key < indexRel->rd_index->indnatts; key++)
378 int relattr = indexRel->rd_index->indkey.values[key];
381 * For a relation having REPLICA IDENTITY set at DEFAULT or INDEX,
382 * if one of the columns used for tuple selectivity is changed,
383 * the old tuple data is not NULL and need to be used for tuple
384 * selectivity. If no such columns are updated, old tuple data is
387 print_where_clause_item(s, relation,
388 oldtuple ? oldtuple : newtuple,
389 relattr, &first_column);
391 index_close(indexRel, NoLock);
395 /* We need absolutely some values for tuple selectivity now */
396 Assert(oldtuple != NULL &&
397 relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
400 * Fallback to default case, use of old values and print WHERE clause
401 * using all the columns. This is actually the code path for FULL.
403 for (natt = 0; natt < tupdesc->natts; natt++)
404 print_where_clause_item(s, relation, oldtuple,
405 natt + 1, &first_column);
409 * Decode an INSERT entry
412 decoder_raw_insert(StringInfo s,
416 TupleDesc tupdesc = RelationGetDescr(relation);
418 bool first_column = true;
419 StringInfo values = makeStringInfo();
421 /* Initialize string info for values */
422 initStringInfo(values);
425 appendStringInfo(s, "INSERT INTO ");
426 print_relname(s, relation);
427 appendStringInfo(s, " (");
429 /* Build column names and values */
430 for (natt = 0; natt < tupdesc->natts; natt++)
432 Form_pg_attribute attr;
436 attr = TupleDescAttr(tupdesc, natt);
438 /* Skip dropped columns and system columns */
439 if (attr->attisdropped || attr->attnum < 0)
442 /* Skip comma for first colums */
445 appendStringInfoString(s, ", ");
446 appendStringInfoString(values, ", ");
449 first_column = false;
451 /* Print attribute name */
452 appendStringInfo(s, "%s", quote_identifier(NameStr(attr->attname)));
454 /* Get Datum from tuple */
455 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
457 /* Get output function */
458 print_value(values, origval, attr->atttypid, isnull);
462 appendStringInfo(s, ") VALUES (%s);", values->data);
465 resetStringInfo(values);
469 * Decode a DELETE entry
472 decoder_raw_delete(StringInfo s,
476 appendStringInfo(s, "DELETE FROM ");
477 print_relname(s, relation);
480 * Here the same tuple is used as old and new values, selectivity will be
481 * properly reduced by relation uses DEFAULT or INDEX as REPLICA IDENTITY.
483 print_where_clause(s, relation, tuple, tuple);
484 appendStringInfoString(s, ";");
489 * Decode an UPDATE entry
492 decoder_raw_update(StringInfo s,
497 TupleDesc tupdesc = RelationGetDescr(relation);
499 bool first_column = true;
501 /* If there are no new values, simply leave as there is nothing to do */
502 if (newtuple == NULL)
505 appendStringInfo(s, "UPDATE ");
506 print_relname(s, relation);
508 /* Build the SET clause with the new values */
509 appendStringInfo(s, " SET ");
510 for (natt = 0; natt < tupdesc->natts; natt++)
512 Form_pg_attribute attr;
518 attr = TupleDescAttr(tupdesc, natt);
520 /* Skip dropped columns and system columns */
521 if (attr->attisdropped || attr->attnum < 0)
524 /* Get Datum from tuple */
525 origval = heap_getattr(newtuple, natt + 1, tupdesc, &isnull);
527 /* Get output type so we can know if it's varlena */
528 getTypeOutputInfo(attr->atttypid,
529 &typoutput, &typisvarlena);
532 * TOASTed datum, but it is not changed so it can be skipped this in
533 * the SET clause of this UPDATE query.
535 if (!isnull && typisvarlena &&
536 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
539 /* Skip comma for first colums */
542 appendStringInfoString(s, ", ");
545 first_column = false;
547 /* Print attribute name */
548 appendStringInfo(s, "%s = ", quote_identifier(NameStr(attr->attname)));
550 /* Get output function */
551 print_value(s, origval, attr->atttypid, isnull);
554 /* Print WHERE clause */
555 print_where_clause(s, relation, oldtuple, newtuple);
557 appendStringInfoString(s, ";");
561 * Callback for individual changed tuples
564 decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
565 Relation relation, ReorderBufferChange *change)
567 DecoderRawData *data;
569 char replident = relation->rd_rel->relreplident;
570 bool is_rel_non_selective;
572 data = ctx->output_plugin_private;
574 /* Avoid leaking memory by using and resetting our own context */
575 old = MemoryContextSwitchTo(data->context);
578 * Determine if relation is selective enough for WHERE clause generation
579 * in UPDATE and DELETE cases. A non-selective relation uses REPLICA
580 * IDENTITY set as NOTHING, or DEFAULT without an available replica
583 RelationGetIndexList(relation);
584 is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
585 (replident == REPLICA_IDENTITY_DEFAULT &&
586 !OidIsValid(relation->rd_replidindex)));
588 /* Decode entry depending on its type */
589 switch (change->action)
591 case REORDER_BUFFER_CHANGE_INSERT:
592 if (change->data.tp.newtuple != NULL)
594 OutputPluginPrepareWrite(ctx, true);
595 decoder_raw_insert(ctx->out,
597 change->data.tp.newtuple);
598 OutputPluginWrite(ctx, true);
601 case REORDER_BUFFER_CHANGE_UPDATE:
602 if (!is_rel_non_selective)
604 HeapTuple oldtuple = change->data.tp.oldtuple;
605 HeapTuple newtuple = change->data.tp.newtuple;
607 OutputPluginPrepareWrite(ctx, true);
608 decoder_raw_update(ctx->out,
612 OutputPluginWrite(ctx, true);
615 case REORDER_BUFFER_CHANGE_DELETE:
616 if (!is_rel_non_selective)
618 OutputPluginPrepareWrite(ctx, true);
619 decoder_raw_delete(ctx->out,
621 change->data.tp.oldtuple);
622 OutputPluginWrite(ctx, true);
626 /* Should not come here */
631 MemoryContextSwitchTo(old);
632 MemoryContextReset(data->context);