Enable CDC for Postgres

To enable CDC for Postgres, a logical decoding output plug-in needs to be installed and configured.

For Gathr, the wal2json plug-in is used.

For more details, refer to Debezium connector for PostgreSQL.

Postgres Configuration

The setups have been done with Postgres v10. Update the below configuration files for Postgres:

/var/lib/pgsql/10/data/postgresql.conf
# - Connection Settings -
listen\_addresses = '\*'

shared\_preload\_libraries='wal2json' # (change requires restart)
wal\_level=logical
max\_wal\_senders=1
max\_replication\_slots=3

Configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running: pg_hba.conf

local replication <youruser> trust (1)
host replication <youruser> 127.0.0.1/32 trust (2)
host replication <youruser> ::1/128 trust (3)

Plug-in Setup/Download

As mentioned, wal2json output plug-in is used for logical decoding. The wal2json output plug-in produces a JSON object per transaction.

Also, note that the installation process requires the PostgreSQL utility pg_config.

Verify that the PATH environment variable is set so as the utility can be found. If not, update the PATH environment variable appropriately.

PATH=/usr/pgsql-10/bin:$PATH

wal2json installation command:

git clone https://github.com/eulerto/wal2json.git
$ cd wal2json
# Make sure your path includes the bin directory that contains the correct `pg\_config`
$ PATH=/path/to/pg/bin:$PATH
$ USE\_PGXS=1 make
$ USE\_PGXS=1 make install
# In case “USE\_PGXS=1 make” doesn’t work then install postgres dev tool first.

Connector Setup/Download

For Debezium Postgres jars and detailed documentation, refer to Debezium connector for MySQL.

Make sure Zookeeper, Kafka, and Kafka Connector are already installed.

Download the connector’s plug-in archive, extract the JARs into your Kafka Connector environment, and add the directory with the JARs to Kafka Connect’s classpath.

debezium-api-1.9.4.Final.jar
debezium-connector-postgres-1.9.4.Final.jar
debezium-core-1.9.4.Final.jar
failureaccess-1.0.1.jar
guava-30.1.1-jre.jar
postgresql-42.3.5.jar
protobuf-java-3.19.2.jar

Restart your Kafka Connector process to pick up the new JARs.

An example of installation on HDP 3.1.0 is shown below:

Check the compatibility of Debezium Postgres CDC packages with the installed Kafka version.

Extract the package to a directory and then copy the Debezium jar files under:

/usr/hdp/3.1.0.0-78/kafka/libs

Connector Configuration Files

Configure the connect-standalone.properties or connect-distributed.properties depending on the cluster setup.

Make below settings in /usr/hdp/3.1.0.0-78/kafka/config

plugin.path=/usr/hdp/3.1.0.0-78/kafka/libs
rest.port=6669

You also need to create a new connector properties file, which will have all the information related to the PGSQL server database configuration. There are more metrices available for which can be enabled as per the requirements.

Below are minimum required details:

Example: /usr/hdp/3.1.0.0-78/kafka /config/connector.properties

$plugin.name=wal2json
name=<Mention any Name for connector>
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user==<name of the Postgres user that has the required privileges>
database.server.name==< provide any logical name of the Postgres server/cluster>
database.port=<Postgres port ex.5432>
database.hostname==< The address of the Postgres server.>
database.password=< The password for the Postgres user>
database.dbname=<he name of the PostgreSQL database to connect to>

Start the Connector

Once all the above settings are done correctly. Start the Debezium Connector.

Use the below command to start:

nohup /usr/hdp/3.1.0.0-78/kafka/bin/connect-standalone.sh config/connect-standalone.properties config/connector.properties &

You can further monitor the nohup.log for its start-up or ERRORS while starting the connector.

Once the connector is started successfully, it is now ready to record the data changes.

PostgreSQL Connector Test

The name of the Kafka topics would be serverName.databaseName.tableName, where:

  • serverName is the logical name of the connector as specified with the database.server.name configuration property.

  • databaseName is the name of the database where the operation occurred.

  • tableName is the name of the database table on which the operation occurred.

For verification, make any changes into the configured DB table, the changes should be reflected in the Kafka topic.

Enable_CDC_for_Postgres_01

Enable_CDC_for_Postgres_02

Setup in SSL and Kerberos Environment

For Kerberos enabled environment, below needs to be set in connect-standalone.properties

# Kerberos</p><p>sasl.mechanism=GSSAPI
producer.security.protocol=SASL\_PLAINTEXT

Also update the connector properties file, which has all the information related to the Postgres database configuration. See, Connector Configuration Files.

database.history.producer.security.protocol=SASL\_PLAINTEXT
database.history.consumer.security.protocol=SASL\_PLAINTEXT

For Kerberos with SSL enabled environment, the below needs to be set in connect-standalone.properties

# Kerberos
sasl.mechanism=GSSAPI
producer.security.protocol=SASL\_SSL

Also update the connector properties file, which has all the information related to the Postgres database configuration. See, Connector Configuration Files.

database.history.producer.security.protocol=SASL\_SSL
database.history.consumer.security.protocol=SASL\_SSL

PostgreSQL Database Settings

To handle the changes related to Deletion of data, the replication Identity should be set.

REPLICA IDENTITY is a PostgreSQL specific table-level setting which determines the amount of information that is available to logical decoding in case of UPDATE and DELETE events.

If Postgres CDC is not emitting data for delete and update queries to kafka topic, then follow below steps:

  • Login to postgres:

  • Check the status of replica identity of target table:

SELECT CASE relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica\_identity
FROM pg\_class
WHERE oid = '<table\_name>'::regclass;

CDC_01

If it is default (which is by default, set to default) then change it to FULL.

ALTER TABLE <table name> REPLICA IDENTITY FULL;

Cross check if replica identity has changed to full or not:

CDC_02

Now, the update and delete queries will also work.

Top