Enable CDC for SQL Server

The database operator must enable CDC for the table(s) that should be captured by the connector.

For more details, refer to Debezium connector for SQL Server.

MSSQL Configuration

Before using the Debezium SQL Server connector to monitor the changes committed on SQL Server, first enable CDC on a monitored database.

-- ==== 
-- Enable Database for CDC template
-- ==== 
USE MyDB 
GO 
EXEC sys.sp\_cdc\_enable\_db
GO

Then enable CDC for each table that you plan to monitor.

-- ========= -- Enable a Table Specifying Filegroup Option Template -- ========= USE MyDB
GO
EXEC sys.sp\_cdc\_enable\_table
@source\_schema = N'dbo',
@source\_name = N'MyTable',
@role\_name = N'MyRole',
@filegroup\_name = N'MyDB\_CT',
@supports\_net\_changes = 1
GO

Start the job to capture the data.

>EXEC sys.sp\_cdc\_start\_job @job\_type = N'capture'

Verify that the user has access to the CDC table.

-- Verify the user of the connector have access, this query should not have empty result
EXEC sys.sp\_cdc\_help\_change\_data\_capture GO

If the result is empty, then please make sure that the user has privileges to access both the capture instance and CDC tables.

SELECT [name],is\_tracked\_by\_cdc FROM sys.tables
The inserts/update/ deletes should be reflected in the CDC table

Enable_CDC_for_SQL_Server_01

Enable_CDC_for_SQL_Server_02

Connector Setup

Download the connectors plugin archives for the required version. Refer to Debezium Releases Overview.

Check the compatibility with your MSSQL DB and KAFKA version.

Extract the JARs into your Kafka environment and add the directory with the JARs to Kafka Connect’s classpath.

Restart your Kafka Connect process to pick up the new JARs.

Below are the steps:

  1. SSH to one of the node where Kafka broker is configured.

  2. Create folder “MSSQLconnect” in kafka home directory.

  3. Download the debezium cdc release fo MSSQL DB from its website. Below is the direct link for Debezium connector v1.0.3 (supports MSSQL 2017,2019).

    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.0.3.Final/debezium-connector-sqlserver-1.0.3.Final-plugin.tar.gz
    
  4. Untar the file:

    tar -xvf debezium-connector-sqlserver-1.0.3.Final-plugin.tar.gz
    
  5. Now copy all the jar files under debezium-connector-sqlserver-1.0.3 folder to Kafka\_home/libs.

    debezium-api-1.9.4.Final.jar
    debezium-connector-sqlserver-1.9.4.Final.jar
    debezium-core-1.9.4.Final.jar
    failureaccess-1.0.1.jar
    guava-30.1.1-jre.jar
    mssql-jdbc-9.4.1.jre8.jar
    
  6. To configure the connection details. Edit the file connect-standlone.properties under kafka config:

    Vi <kafka\_home> / config/connect-standalone.properties
    Now edit below properties:
    bootstrap.servers=<kafka\_Broker1>:6667,<kafka\_Broker2>:6667, ,<kafka\_Broker3>:6667
    plugin.path= <kafka\_home> /libs
    rest.port= <6669, check availability of port>
    

Connector Configuration Files

Configure the connect-standalone.properties or connect-distributed.properties depending on the cluster setup.

Make below settings in /usr/hdp/3.1.0.0-78/kafka /config

plugin.path=/usr/hdp/3.1.0.0-78/kafka/libs
rest.port=6669

You also need to create a new connector properties file, which will have all the information related to the MSSQL server database configuration. There are more metrices available that can be enabled as per the requirements.

Below are the minimum required details:

Example: /usr/hdp/3.1.0.0-78/kafka /config/connector.properties

name=<Mention any Name for connector>
connector.class= io.debezium.connector.sqlserver.SqlServerConnector
database.user=<name of the SQL user that has the required privileges>
database.dbname=< provide any logical name of the MSSQL server/cluster>
database.port=<MSSQL port ie.1433>
database.hostname=< The address of the MSSQL server.>
database.password=< The password for the MSSQL user>
database.whitelist=< A list of all tables whose changes Debezium should capture>
database.history.kafka.bootstrap.servers= <List of Kafka brokers that this connector:PORT>
database.history.kafka.topic= <name of the DB history topic>

Setup in Kerberos and SSL Environment

For Kerberos enabled environment, the below need to be set in connect-standalone.properties.

# Kerberos
sasl.mechanism=GSSAPI
producer.security.protocol=SASL\_PLAINTEXT

Also update the connector properties file, which has all the information related to the MYSQL server database configuration. See Connector Configuration Files.

database.history.producer.security.protocol=SASL\_PLAINTEXT
database.history.consumer.security.protocol=SASL\_PLAINTEXT

For Kerberos with SSL enabled environment, the below need to be set in connect-standalone.properties.

# Kerberos
sasl.mechanism=GSSAPI
producer.security.protocol=SASL\_SSL

Also update the connector properties file, which have all the information related to the MYSQL server database configuration. See Connector Configuration Files.

database.history.producer.security.protocol=SASL\_SSL
database.history.consumer.security.protocol=SASL\_SSL

Start Connector and Verify the Setup

Start the Kafka connect process for MSSQL:

From the Kafka home directory, run the below command:

nohup bin/connect-standalone.sh config/connect-standalone.properties config/mssql\_connector.properties &
Verify from the nohup.out log, if the process successfully started or for any ERRORS.

Now, as Kafka connect process has been started and CDC is enabled on MSSQL DB tables, the changes on the Gathr application can be verified.

Also, you can verify the DB row-level changes on the Kafka topic.

The SQL Server connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic.

The name of the Kafka topics would be serverName.schemaName.tableName, where:

  • serverName is the logical name of the connector as specified with the database.server.name configuration property.

  • schemaName is the name of the schema where the operation occurred.

  • tableName is the name of the database table on which the operation occurred.

Top