Create CDC Application

To create a new application, click the plus icon on the top right side of the screen.

Select Template

In the Create Change Data Capture Application window, provide the name of the application.

Using Gathr, organizations can now ensure that high volumes of data are not just moved but processed before it is delivered.

You can easily create the CDC applications using one of the various templates available with Gathr platform. These are:

MySQL to S3/ GCS/ Snowflake/ Hive/ Hadoop/ MySQL/ PostgreSQL/ MsSQL/ Oracle

PostgreSQL to S3/ GCS/ Snowflake/ Hive/ Hadoop/ MySQL/ PostgreSQL/ MsSQL/ Oracle

MsSQL to S3/ GCS/ Snowflake/ Hive/ Hadoop/ ADLS/ Delta/ MySQL/ PostgreSQL/ MsSQL/ Oracle

Oracle to S3/ GCS/ Snowflake/ Hive/ Hadoop/ ADLS/ Delta/ MySQL/ PostgreSQL/ MsSQL/ Oracle/SingleStore

Teradata to S3

In MySQL to Hive, you capture data from the MySQL database in batch as well as in streaming modes and replicate the captured data in Hive. Once the application is completed the source and target have consistent data. Likewise, using S3 and Hadoop to capture data provides you with a highly efficient data-driven approach with security, management efficiency, and storage optimization.

Select the Template and click OK

Create_CDC_App

Once the template is selected, you will be configuring the properties of the above-mentioned template.

As you begin to create the application, you will be required to complete a few stages of creating the application including the source configuration, select source where you choose various database tables, select target and do column mapping to map the source and target table columns. There are also some optional configuration options including advance level configuration, job grouping, scheduling and jobs configuration that needs to be mentioned to create a CDC application.

Select Source

In Source configuration you will be required to configure the source database connection and the method to capture change data.

Configure Source Connection

You are required to choose the connection details of the SOURCE that you have selected.

CDC_Source

In this step, select a Connection and a Method

FieldDescription
Database Connection

Choose connection in which your table exists, for which you want to capture CDC.

This tab has an info Icon, which shows the connection details.

Shown in the figure below.

Method

Method of capturing CDC.

Agent: Debezium is a service that captures change data from database in real-time and publish those changes to the Kafka topic, so that downstream applications can consume that data and perform the operations on it.

Incremental Read: Captures all the records in an incremental fashion and will capture only insert and update records.

LogMiner: In this method, the application directly accesses Oracle redo logs, which comprises of the entire set of records of all the activities performed on the database.

If you are creating a CDC application using Oracle to MySQL/ Oracle to MsSQL/ Oracle to Postgres/ Oracle to SingleStore/ Oracle to Oracle, you will have Logminer (Oracle), Logminer (Oracle) with Kafka and Incremental Read options available under Method under the Select Source page.

Kafka ConnectionChoose the Kafka connection where your database agent is writing data.
Read ModeChoose whether you want to create application in batch or streaming mode.

Database Connection Detail

connectiondetails

Kafka Connection Detail:

Kafka-connectiondetails

Validate Configuration

Next, validate the required access by clicking on the Validate icon on the top right corner of the Source configuration Window.

validationSucc

Here, you request to validate the connection, required privileges, ensure enabling the CDC and the Kafka connection.

Once the validation is successful, click the Next icon to configure the source table.

Select Source

In the Select Source Panel, you will have an accordion view of different databases of the source.

CDC_Select_Source

As you expand the accordion you can see the database tables. Click on a specific database and from within the database and choose specific tables. Once the selection is made, click Next

Select Target

In the Select Target configuration, choose the target connection where you need to write the data. You may view the details of the connection by clicking on the View Details icon (i) option next to Connection.

Select Target will be prompted to select the connection for the target and select target path in case of HDFS and S3 and table in case of Hive.

Also, if you select Agent as a source method, then you will also be asked for a Kafka Topic from where you read the data. You will have to identify the topic name for the individual table name from the debezium agent configuration.

This wizard gives you an option to choose the Target Table where you want to write the data that is captured from the Source Table.

As you click on one of the Source Tables you can see the schema and the table view where you may choose a specific table within the schema.

CDC_Select_Target

FieldDescription
ConnectionIt is the connection of the target where you want to write data of source table.
Source TablesIt is the table of source database from where you want to read the data.
Kafka TopicIt is the drop-down of Kafka topics. Here user have to choose one of the topics where Debezium database agent is configured to write the changed data. This will be visible only for Agent method.
Target TablesIt is the table from target connection. User will configure this table by clicking on the row.
Output FormatIt is the format in of the selected target table.
Output DelimiterIt is the field delimiter for the target table data.

 

Select Target Table

Now you can map the source table to the target table.

SelectTable2

In the Target Table window, you can also create new tables, by clicking on the plus icon.

You will be required to Add Column for Version and Delete Flag for SCD Type 2 in the Create New Table window. 

 

targetTable-Create

CREATE TABLE BOMBAY13234456 (USER_ID timestamp null default NULL ,FIRST_NAME timestamp null  default NULL ,LAST_NAME varchar(100) ,EMAIL varchar(100) ,MODIFIED_DATE timestamp null default NULL)

Close the right navigation pane and click the Next button to configure the Column Mapping.

Column Mapping

After configuring the target, you will be navigated to the Column Mapping section, where the columns of both the source and target will be mapped.

CDC_Column_Mapping

The column mapping window has two major sections, one is list of Source Table, their corresponding Target Table, SCD Type, version column and deleted flag column, and the other is where the Target table columns will open for you to map.

FieldDescription
Source TableSource Table takes the table for which you want to capture CDC.
Target TableTarget Table takes the table for which you want to capture Source table CDC.
SCD TypeSelect type you want to use to capture CDC.
Type 1 will contain only the updated records while selecting.
Type 2 will maintain the whole history.
Explained below in detail.

SCD Type, Version Column and Deleted Flag Column

FieldDescription
SCD Type

Under SCD Type there are two types, i.e., Slowly Changing Dimension type to streamline your data-flow, Type 1 and Type 2.

Enter SCD Type for each source and target table mapping.

In case of Type 1, your target table will only contain updated records and in Type 2, the target table will maintain the entire history of the source.

Thus, you need to have two additional columns in case of Type2. One of the columns will maintain version of the record and the other will maintain DELETED_FLAG. Here, the deleted flag identifies if the record is deleted or not. For SCD Type 2, select the Version and Delete Flag option from the drop down.

Note: The Version and Delete Flag options will be available once the user has create them in the column mapping window as described above. In case of Snowflake, the Current Flag option needs to be provided under SCD Type 2. For the Insert and Update operations on records the deleted flag value will be false and for the delete operation the value will be true.

Version KeyTarget Column is the corresponding column to Source column for which you want to capture Source table CDC.
Deleted Flag KeyChoose the Deleted Flag key from the ‘Column Type’ column by clicking on the fourth Deleted Flag key icon. Here, the deleted flag represents the current state of record. It identifies if the record is deleted or not.

Map Source and Target Table Columns

In the Column Mapping Section, the columns of both the source and target can be mapped by you. The source can be mapped with the corresponding columns of the target table.

There could be a case when you have multiple columns in your source table but you require to select a few columns and map them to the target table. In this wizard you can do that easily.

There could be a scenario when the source table column names are not like that of the target table names. To map those, you can simply drag and drop the columns against each other and map them. You will have to choose the destination connection on which you want to store the changed data captured from source. And choose the Schema name for the target table that will store the changed data captured from the source.

You may also click on the auto map button to map similar name source column with the target column.

FieldDescription
Source ColumnSource Columns is the specific column from the source table for which you want to capture CDC.
Column to CheckSelect an integer, long, date, timestamp or decimal type column on which incremental read will work. It is recommended that this column should have sequential, unique, and sorted (in increasing order) values.
Target ColumnTarget Column is the corresponding column to Source column for which you want to capture Source table CDC.
Column TypesPartitioning Key:
Choose the Partition key from the ‘Column Type’ column by clicking on the first Partition key icon. The Partition key is the prime factor for determining how the data is stored in the table. The key helps you to organize the tables into partitions by dividing them into different parts.
Join Key:
Choose the Join key from the ‘Column Type’ column by clicking on the second Join key icon. Here, the Join key helps in incrementing the version of record by identifying the availability of similar record in the target table.

Advanced Configuration

Once you click on the Next button, you are ready for the Advanced Configuration. This is an optional high-level configuration for the data flow created in your application needed for performance optimization and monitoring.

Source Configuration

The source configuration is different for different CDC Methods selected. Explained below are the methods: Agent (for MySQL, MSSQL and Postgres), LogMiner and LogMiner with Kafka (Oracle source)Incremental Read (for all databases).

Configuration for Database Agent Method

CDC_Advance_Configuration

FieldDescription
PartitionsNumber of partitions. Each partition is an ordered unchangeable sequence of message that is repeatedly added to a commit log.
Replication FactorNumber of replications. Replication provides stronger durability and higher availability. For example, a topic with replication factor N can tolerate up to N-1 server failures without losing any messages committed to the log.
Define OffsetFollowing configurations are used for Kafka offset.
Latest: The starting point of the query is just from the latest offset.
Earliest: The starting point of the query is from the starting /first offset.
Custom: A json string specifying a starting and ending offset for each partition.
startingOffsets: A JSON string specifying a starting offset for each partition i.e. {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}
endingOffsets: A JSON string specifying a ending offset for each partition. This is an optional property with default value “latest”.i.e. {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1
Connection RetriesThe number of retries for component connection. Possible values are -1, 0 or any positive number. If the value is -1 then there would be infinite retries for infinite connection.
Fail on Data LossProvides option of query failure in case of data loss. (For example, topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn’t work as you expected. Batch queries will always fail, if it fails to read any data from the provided offsets due to data loss.
Delay Between Connection RetriesRetry delay interval for component connection (in milliseconds).
Is Schema EnabledWhile running the Debezium connector, you have to set the schema.enable property as true, then the checkbox isSchemaEnable must be check marked.

Configuration for Incremental Read Method

FieldDescription
Column To CheckSelect a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal type of values. It is recommended that this column should have sequential, unique, and sorted (in increasing order) values.
Start ValueYou can specify the value from where you want to begin reading data from the source table.
Read Control TypeYou can control the reading of data from database by utilizing three different approaches.
Limit by Count: Here, you can specify the number of records that you want to read in a single request.
Limit by Value: Here, you can specify the value for configured column till the point where you want to read the data.
None: Here, you can read the data without specifying any limit.
From what value of columnToCheck, the data to be read.
Enable PartitioningRead parallel data from the running query.
No. of PartitionsSpecifies no of parallel threads to be invoked to read from JDBC in spark.
Partition on ColumnPartitioning column can be any column of type Integer, on which spark will perform partitioning to read data in parallel.
Lower Bound/Upper BoundValue of the lower bound for partitioning column/ Value of the upper bound for partitioning column.

Configuration For LogMiner Method

FieldDescription
Use Pluggable DatabaseSelect the option for 12c multi-tenant databases.
Pluggable Database NameName of the pluggable database that contains the schema you want to use. Use only when the schema was created in a pluggable database.
Source ConfigurationUnder the Source Configuration, the below fields are available:
Operations

Operations for creating records.

INSERT

DELETE

UPDATE

Database Time ZoneTime zone of the database. When the database operates in a different time zone from Data Collector.
Logminer Session ParametersUnder Logminer Session Parameters, the below fields are available:
Maximum Transaction TimeTime in seconds to wait for changes for a transaction. Enter the longest period that you expect a transaction to require. Default is 60 seconds.
Log Miner Session WindowTime in seconds to keep a LogMiner session open. Set to larger than the maximum transaction length. Reduce when not using local buffering to reduce LogMiner resource use. Default is 7200 seconds.
Tunning ParametersUnder Tunning Parameters below fields are available:
Max Batch Size (records)Maximum number of records processed at one time. Keep values up to the Data Collector maximum batch size. Default is 1000.
Fetch SizeMinimum number of records to fetch before passing a batch to the pipeline. Keep this value low to allow the origin to pass records to the pipeline as soon as possible, rather than waiting for a larger number of records to become available. Lower values can increase throughput when writes to the destination system are slow. Default is 1.
OffsetUnder offset, the below fields are available:
Start From

Start from is the point in the LogMiner redo logs where you want to start processing. When you start the pipeline, Oracle CDC Client starts processing from the specified initial change and continues until you stop the pipeline.

1. From the latest change

The origin processes all change that occur after you start the pipeline.

2. From a specified datetime:

The origin processes all change that occurred at the specified datetime and later. Use the following format: DD-MM-YYYY HH:mm:ss.

3. From a specified system change number (SCN)

The origin processes all change that occurred in the specified SCN and later. When using the specified SCN, the origin starts processing with the timestamps associated with the SCN. If the SCN cannot be found in the redo logs, the origin continues reading from the next higher SCN that is available in the redo logs.

In case the Date option is selected then If future date/time are selected, it will be accepted only once the current time passes over the selected start date/time.
Query TimeoutTime to wait before timing out a LogMiner query. Default is 300 seconds.

Configuration for LogMiner with Kafka (Oracle source)

FieldDescription
Operations

Operations for creating records.

INSERT

DELETE

UPDATE

Query IntervalProvide query interval in seconds.
Tunning ParametersUnder Tunning Parameters below fields are available:
Fetch SizeMinimum number of records to fetch before passing a batch to the pipeline. Keep this value low to allow the origin to pass records to the pipeline as soon as possible, rather than waiting for a larger number of records to become available. Lower values can increase throughput when writes to the destination system are slow. Default is 1.
Query TimeoutTime to wait before timing out a logminer query. Default is 500 seconds.

The Source Configuration will have the below details:

FieldDescription
PartitionsNumber of partitions to create for a topic. Each partition is an ordered immutable sequence of messages that is continually appended to a commit log.
Replication FactorFor a topic, with replication factor N, Kafka will tolerate upto 9-1 server failures without loosing any messages commited to the log.
Define OffsetSelect one of the options available in the drop-down list: Earliest, Latest, Custom.
Starting OffsetsThe start point when a query is started. These offset options are: ‘Earliest’ which is from the earliest offset. ‘Latest’ which is just from the atest offset. Or a JSON string that specifies a starting offset for each TopicPartition. In JSON string -2 offset refers to Earliest while -1 refers to Latest. This option is availabe is Define Offset is selected as Custom.
Ending OffsetsThe end point when a batch query is ended. Here, Latest offset refers to latest while a JSON string specifies an ending offset for each topic partition. In JSON string -1 offset refers to Latest while -2 (Earliest) offset is not allowed. Default value is Latest. This option is availabe is Define Offset is selected as Custom.
Maximum offset per triggerThe maximum offset value is to be considered in each trigger interval.
Connection RetriesThe number of retries for component connections. Possible values are -1, 0 or positive numbers. -1 imply infinite retries.
Fail On Data LossOption to select True or False for failure on data loss as data could be lost in scenarios such as when topics are deleted, or offsets are out of range. Applicable for streaming data only.
Delay Between Connection RetriesDefines the retry delay interval (in milliseconds) for component connection.

Target Configuration

Target Configuration varies for Hadoop, S3 and Hive.

Configuration for Hadoop

targetconfig-hadoop

FieldDescription
Monitor Operation StatisticsSelect this box to monitor each run statistics.
Block SizeSize of each block (in Bytes) allocated in HDFS.
ReplicationEnables to make additional copies of data.
Compression TypeAlgorithm used to compress the data.
Enable TriggerTrigger defines how frequently a streaming query should be executed.
Processing TimeIt will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval.
Processing Time UnitProcessing Time Unit is in minutes or seconds.

Configuration for Hive

targetconfig-hive

FieldDescription
ReplicationEnables to copy your data on underlying Hadoop file system. For example, if you specify “2” as Replication, then two copies will be created on HDFS.

Configuration for S3

targetconfig-s3

FieldDescription
Monitor Operation StatisticsMonitor Data Manipulation operations such as insert, update and delete functions performed on data of the source tables.
S3 protocolS3 protocol to be used while writing on S3.

Pipeline Grouping Section

In the jobs grouping page, you can group the jobs into a single unit. To create group, specify the group name and click Create. This means the dataflow that you have created can be grouped into a single pipeline or into multiple pipelines.

In certain cases, you may want to combine multiple ‘source to target’ jobs into one group. You can do this easily by assigning a group name to multiple jobs.

Here, a group means a ‘single pipeline’. In Gathr, the unit which performs the operations is called a pipeline. Thus, having two separate groups means having two different pipelines.

CDC_Pipeline_Grouping

After creating group, click next.

Scheduling

In the scheduling page, you can schedule the batch jobs. In certain business requirements, where you need to run your job at mid night, you may do that easily in this wizard.

CDC_Scheduling

Jobs Configuration

Next, in the jobs configuration page, you can allocate the resources to the jobs. You may configure the parameters at which you would want to run this application. Once you finish doing the configuration, click done to create application.         

CDC_jobsConfig

FieldDescription
Log Level

Control the logs generated by the pipeline based on the selected log level.

Trace: View information of trace log level.

Debug: View information of debug and trace log levels.

Info: View information of trace, debug and info log levels.

Warn: View information of trace, debug, warn and info log levels.

Error: View information of trace, debug, warn, info and error log levels.

Deployment Mode

Specifies the deployment mode of the pipeline.

Cluster: In cluster mode, driver runs on any of the worker nodes.

Client: In client mode, driver runs on the Gathr admin node.

Local: In local mode, driver and the executors run on the Gathr admin container.

Driver CoresNumber of cores to use for the driver process.
Driver MemoryAmount of memory to use for the driver process
Executor CoresNumber of cores to use on each executor.
Executor InstancesNumber of executors to use for pipeline.
Executor MemoryAmount of memory to use per executor process
Yarn Queue(for Yarn deployment mode)Name of the queue to use within pipeline for execution.
HDFS User(for Yarn deployment mode)Name of the HDFS user to use within pipeline for accessing the hadoop services.
Error HandlerYou can enable/disable error handling in job using this configuration.
Error Log TargetSelect the target where you want to move the data that failed to process in the pipeline. If you select RabbitMQ, or Kafka, then you will have to specify the error target connection.
ConnectionSelect the connection where failed data would be pushed.
KeyTab OptionIt signifies the mode of providing the Keytab file for authentication. There are two way through which user can supply the keytab file, either by specifying the key tab file path(file should present on each node) or by uploading it.
KeyTab File PathIt allow users to specify the key tab file path.
Upload keytab fileIt allow users to upload the key tab file.
Dynamic Allocation EnabledWhen a pipeline is in running mode, the spark pipeline scale and scale down the number of executors at the runtime. (Only in case of CDH enabled environment).
Extra Driver Java OptionsA string of extra JVM options to pass to the driver. For instance, GC settings or other logging. For example: -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
Extra Executor Java OptionsA string of extra JVM options to pass to executors. For instance, GC settings or other logging. For example: -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
Extra Spark Submit OptionsA string with –conf option for passing all the above configuration to a spark application. For example: –conf ‘spark.executor.extraJavaOptions=-Dcon-fig.resource=app’ –conf ‘spark.driver.extraJavaOptions=-Dconfig.resource=app’

Click Done to create the application.

For Logminer with Kafka you will have the below fields:

FieldDescription
Start FromSelect one of the options available in the drop-down: LATEST, SCN, DATE.
FieldDescription
LATESTSelect LATEST for pushing the data after the pipeline gets active to be able to view the data in the output emitter.

Upon selecting SCN, the below field is available:

FieldDescription
SCNThe origin processes all changes that occurred in the specified SCN and later. When using the specified SCN and later. When using the specified SCN, the origin starts processing with the timestamp associated with the SCN. If the SCN cannot be found in the redo logs, the origin continues reading from the next higher SCN that is available in the redo logs.

Upon selecting DATE, the below field is available:

FieldDescription
Initial DateThe date on specific source will get emitted based on the date (staring from) and time selected on Date Method type. Provide the value for initial date. ex., 10/17/2023, 11:05 AM

You can view the created application on the dashboard.

The created application tile displays the name of the CDC application, application type, total number of pipelines, total active, stopped and error pipelines running within the CDC application.

CDC_Pipeline_Listing

If you are creating CDC application as Oracle to Postgres, Oracle to SingleStore, Oracle to Oracle, Oracle to Mysql and Orace to Mssql then the below scenario will be available in the source window, Select Target window, Advance Configuration window, Jobs Configuration window.

If the Source is selected as Oracle, then the Source window while creating CDC application will have Method as: Logminer (Oracle), Logminer (Oracle) with Kafka, Incremental Read. You also have an option to check Use Pluggable Database. If checked, then provide Pluggable Database Name.

oracle

Also, Kafka configuration check will be there for configuration validation.

oracle_validate

Select Source window is as it is as explained above in MySql to Hive CDC application creation flow.

While creating CDC application, using Oracle as source, on the Select Target window, Kafka Topic will be auto populated on user-interface. Kafka topic takes the topic name on which your CDC records are getting published.

target_kafka_topic

Advance Configuration window:

oracle_advance_config

oracle_advanceconfig_2

Jobs configurations window will have Start From Option. You can select Latest, SCN or Date from the drop-down. You can keep pushing the data after the pipeline gets active to be able to view the data in the output emitter.

oracle_jobs_config

Top