Enable CDC for Postgres
To enable CDC for Postgres, a logical decoding output plug-in needs to be installed and configured.
For Gathr, the wal2json plug-in is used.
For more details, refer to Debezium connector for PostgreSQL.
Postgres Configuration
The setups have been done with Postgres v10. Update the below configuration files for Postgres:
/var/lib/pgsql/10/data/postgresql.conf
# - Connection Settings -
listen\_addresses = '\*'
shared\_preload\_libraries='wal2json' # (change requires restart)
wal\_level=logical
max\_wal\_senders=1
max\_replication\_slots=3
Configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running: pg_hba.conf
local replication <youruser> trust (1)
host replication <youruser> 127.0.0.1/32 trust (2)
host replication <youruser> ::1/128 trust (3)
Plug-in Setup/Download
As mentioned, wal2json output plug-in is used for logical decoding. The wal2json output plug-in produces a JSON object per transaction.
Also, note that the installation process requires the PostgreSQL utility pg_config.
Verify that the PATH environment variable is set so as the utility can be found. If not, update the PATH environment variable appropriately.
PATH=/usr/pgsql-10/bin:$PATH
wal2json installation command:
git clone https://github.com/eulerto/wal2json.git
$ cd wal2json
# Make sure your path includes the bin directory that contains the correct `pg\_config`
$ PATH=/path/to/pg/bin:$PATH
$ USE\_PGXS=1 make
$ USE\_PGXS=1 make install
# In case “USE\_PGXS=1 make” doesn’t work then install postgres dev tool first.
Connector Setup/Download
For Debezium Postgres jars and detailed documentation, refer to Debezium connector for MySQL.
Make sure Zookeeper, Kafka, and Kafka Connector are already installed.
Download the connector’s plug-in archive, extract the JARs into your Kafka Connector environment, and add the directory with the JARs to Kafka Connect’s classpath.
debezium-api-1.9.4.Final.jar
debezium-connector-postgres-1.9.4.Final.jar
debezium-core-1.9.4.Final.jar
failureaccess-1.0.1.jar
guava-30.1.1-jre.jar
postgresql-42.3.5.jar
protobuf-java-3.19.2.jar
Restart your Kafka Connector process to pick up the new JARs.
An example of installation on HDP 3.1.0 is shown below:
Check the compatibility of Debezium Postgres CDC packages with the installed Kafka version.
Extract the package to a directory and then copy the Debezium jar files under:
/usr/hdp/3.1.0.0-78/kafka/libs
Connector Configuration Files
Configure the connect-standalone.properties or connect-distributed.properties depending on the cluster setup.
Make below settings in /usr/hdp/3.1.0.0-78/kafka/config
plugin.path=/usr/hdp/3.1.0.0-78/kafka/libs
rest.port=6669
You also need to create a new connector properties file, which will have all the information related to the PGSQL server database configuration. There are more metrices available for which can be enabled as per the requirements.
Below are minimum required details:
Example: /usr/hdp/3.1.0.0-78/kafka /config/connector.properties
$plugin.name=wal2json
name=<Mention any Name for connector>
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user==<name of the Postgres user that has the required privileges>
database.server.name==< provide any logical name of the Postgres server/cluster>
database.port=<Postgres port ex.5432>
database.hostname==< The address of the Postgres server.>
database.password=< The password for the Postgres user>
database.dbname=<he name of the PostgreSQL database to connect to>
Start the Connector
Once all the above settings are done correctly. Start the Debezium Connector.
Use the below command to start:
nohup /usr/hdp/3.1.0.0-78/kafka/bin/connect-standalone.sh config/connect-standalone.properties config/connector.properties &
You can further monitor the nohup.log for its start-up or ERRORS while starting the connector.
Once the connector is started successfully, it is now ready to record the data changes.
PostgreSQL Connector Test
The name of the Kafka topics would be serverName.databaseName.tableName, where:
serverName is the logical name of the connector as specified with the database.server.name configuration property.
databaseName is the name of the database where the operation occurred.
tableName is the name of the database table on which the operation occurred.
For verification, make any changes into the configured DB table, the changes should be reflected in the Kafka topic.
Setup in SSL and Kerberos Environment
For Kerberos enabled environment, below needs to be set in connect-standalone.properties
# Kerberos</p><p>sasl.mechanism=GSSAPI
producer.security.protocol=SASL\_PLAINTEXT
Also update the connector properties file, which has all the information related to the Postgres database configuration. See, Connector Configuration Files.
database.history.producer.security.protocol=SASL\_PLAINTEXT
database.history.consumer.security.protocol=SASL\_PLAINTEXT
For Kerberos with SSL enabled environment, the below needs to be set in connect-standalone.properties
# Kerberos
sasl.mechanism=GSSAPI
producer.security.protocol=SASL\_SSL
Also update the connector properties file, which has all the information related to the Postgres database configuration. See, Connector Configuration Files.
database.history.producer.security.protocol=SASL\_SSL
database.history.consumer.security.protocol=SASL\_SSL
PostgreSQL Database Settings
To handle the changes related to Deletion of data, the replication Identity should be set.
REPLICA IDENTITY is a PostgreSQL specific table-level setting which determines the amount of information that is available to logical decoding in case of UPDATE and DELETE events.
If Postgres CDC is not emitting data for delete and update queries to kafka topic, then follow below steps:
Login to postgres:
Check the status of replica identity of target table:
SELECT CASE relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica\_identity
FROM pg\_class
WHERE oid = '<table\_name>'::regclass;
If it is default (which is by default, set to default) then change it to FULL.
ALTER TABLE <table name> REPLICA IDENTITY FULL;
Cross check if replica identity has changed to full or not:
Now, the update and delete queries will also work.
If you have any feedback on Gathr documentation, please email us!