2 47.1. Logical Decoding Examples #
4 The following example demonstrates controlling logical decoding using
7 Before you can use logical decoding, you must set wal_level to logical
8 and max_replication_slots to at least 1. Then, you should connect to
9 the target database (in the example below, postgres) as a superuser.
10 postgres=# -- Create a slot named 'regression_slot' using the output plugin 'tes
12 postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', '
13 test_decoding', false, true);
15 -----------------+-----------
16 regression_slot | 0/16B1970
19 postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, c
20 onfirmed_flush_lsn FROM pg_replication_slots;
21 slot_name | plugin | slot_type | database | active | restart_lsn |
23 -----------------+---------------+-----------+----------+--------+-------------+
25 regression_slot | test_decoding | logical | postgres | f | 0/16A4408 |
29 postgres=# -- There are no changes to see yet
30 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
36 postgres=# CREATE TABLE data(id serial primary key, data text);
39 postgres=# -- DDL isn't replicated, so all you'll see is the transaction
40 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
43 -----------+-------+--------------
44 0/BA2DA58 | 10297 | BEGIN 10297
45 0/BA5A5A0 | 10297 | COMMIT 10297
48 postgres=# -- Once changes are read, they're consumed and not emitted
49 postgres=# -- in a subsequent call:
50 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
57 postgres=*# INSERT INTO data(data) VALUES('1');
58 postgres=*# INSERT INTO data(data) VALUES('2');
61 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
64 -----------+-------+---------------------------------------------------------
65 0/BA5A688 | 10298 | BEGIN 10298
66 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
67 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
68 0/BA5A8A8 | 10298 | COMMIT 10298
71 postgres=# INSERT INTO data(data) VALUES('3');
73 postgres=# -- You can also peek ahead in the change stream without consuming cha
75 postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, N
78 -----------+-------+---------------------------------------------------------
79 0/BA5A8E0 | 10299 | BEGIN 10299
80 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
81 0/BA5A990 | 10299 | COMMIT 10299
84 postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same c
86 postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, N
89 -----------+-------+---------------------------------------------------------
90 0/BA5A8E0 | 10299 | BEGIN 10299
91 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
92 0/BA5A990 | 10299 | COMMIT 10299
95 postgres=# -- options can be passed to output plugin, to influence the formattin
97 postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, N
98 ULL, 'include-timestamp', 'on');
100 -----------+-------+---------------------------------------------------------
101 0/BA5A8E0 | 10299 | BEGIN 10299
102 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
103 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
106 postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
107 postgres=# -- server resources:
108 postgres=# SELECT pg_drop_replication_slot('regression_slot');
109 pg_drop_replication_slot
110 -----------------------
114 The following examples show how logical decoding is controlled over the
115 streaming replication protocol, using the program pg_recvlogical
116 included in the PostgreSQL distribution. This requires that client
117 authentication is set up to allow replication connections (see
118 Section 26.2.5.1) and that max_wal_senders is set sufficiently high to
119 allow an additional connection. The second example shows how to stream
120 two-phase transactions. Before you use two-phase commands, you must set
121 max_prepared_transactions to at least 1.
123 $ pg_recvlogical -d postgres --slot=test --create-slot
124 $ pg_recvlogical -d postgres --slot=test --start -f -
126 $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
129 table public.data: INSERT: id[integer]:4 data[text]:'4'
132 $ pg_recvlogical -d postgres --slot=test --drop-slot
135 $ pg_recvlogical -d postgres --slot=test --create-slot --enable-two-phase
136 $ pg_recvlogical -d postgres --slot=test --start -f -
138 $ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACT
142 table public.data: INSERT: id[integer]:5 data[text]:'5'
143 PREPARE TRANSACTION 'test', txid 694
145 $ psql -d postgres -c "COMMIT PREPARED 'test';"
147 COMMIT PREPARED 'test', txid 694
149 $ pg_recvlogical -d postgres --slot=test --drop-slot
151 The following example shows SQL interface that can be used to decode
152 prepared transactions. Before you use two-phase commit commands, you
153 must set max_prepared_transactions to at least 1. You must also have
154 set the two-phase parameter as 'true' while creating the slot using
155 pg_create_logical_replication_slot Note that we will stream the entire
156 transaction after the commit if it is not already decoded.
158 postgres=*# INSERT INTO data(data) VALUES('5');
159 postgres=*# PREPARE TRANSACTION 'test_prepared1';
161 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
164 -----------+-----+---------------------------------------------------------
165 0/1689DC0 | 529 | BEGIN 529
166 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
167 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
170 postgres=# COMMIT PREPARED 'test_prepared1';
171 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
174 -----------+-----+--------------------------------------------
175 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
178 postgres=#-- you can also rollback a prepared transaction
180 postgres=*# INSERT INTO data(data) VALUES('6');
181 postgres=*# PREPARE TRANSACTION 'test_prepared2';
182 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
185 -----------+-----+---------------------------------------------------------
186 0/168A180 | 530 | BEGIN 530
187 0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
188 0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
191 postgres=# ROLLBACK PREPARED 'test_prepared2';
192 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
195 -----------+-----+----------------------------------------------
196 0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530