Data Sources

A Data Source represents the source for a data pipeline in Gathr.

Typically, you start creating a pipeline by selecting a Data Source for reading data. Data Source can also help you to infer schema from Schema Type which can directly from the selected source or by uploading a sample data file.

Gathr Data Sources are built-in drag and drop operators. The incoming data can be in any form such as message queues, transactional databases, log files and many more.Data_Sources

Gathr runs on the computation system: Spark.

Within Spark, we have two types of Data Source’s behavior:

Streaming Data Sources

Batch Data Sources

Pre-Actions

The user may want to perform certain actions before the execution of source components. A Pre-Action tab is available at the source.

These actions could be performed on the Source:

Field

Description

SQL

The user can execute SQL from the actions option by selecting the connection at which he wants to execute the SQL and provide the query which he wants to execute.

Shell Script

The user can invoke a Shell Script as an Action.

To invoke the shell script there are two ways to provide the input:

- Writing the shell script in the inline editor

- The user can upload the shell script file to execute it.

Stored Procedure

With this option user can execute a stored procedure or function stored in a database.

To do so, the user has to select an appropriate JDBC connection.

Further he would be required to select the relevant stored procedure appearing in the drop-down menu.


Note:

- If for some reason the action configured fails to execute, the user has an option to check mark the ‘Ignore Action Error’ option so that the pipeline runs without getting impacted.

- By check marking the ‘Ignore while execution’ option, the configuration will remain intact in the pipeline, but the configured action will not get executed.

- The user can also configure multiple actions by clicking at the Add Action button. 01preaction

preaction

Pre_Action_Source

Auto Schema

This feature helps define the schema while loading the data; with Auto Schema feature, first load data from a source then infer schema, modify data types and determine the schema on the go.

This feature helps design the pipeline interactively. Data can be loaded in the form of CSV/TEXT/JSON/XML/Fixed Length and Parquet file or you can fetch data from sources such as Kafka, JDBC and Hive. Auto schema enables the creation of schema within the pre-built operators and identifies columns in CSV/JSON/TEXT/XML/Fixed Length and Parquet.

Gathr starts the inference by ingesting data from the data source. As each component processes data, the schema is updated (as per the configuration and the logic applied). During the process, Gathr examines each field and attempts to assign a data type to that field based on the values in the data.

All our components (source) have auto schema enabled. Below mentioned are the common configuration of Auto schema for every Data Source.

To add a Data Source to your pipeline, drag the Data Source to the canvas and right click on it to configure.

There are three tabs for configuring a component.

1.    Schema Type

2.    Configuration

3.    Incremental Read

4.    Add Notes

Note: An additional tab (DETECT SCHEMA) is reflected. This tab presents your data in the form of Schema.

Schema Type

Schema Type allows you to create a schema and the fields. On the Schema Type tab, select either of the below mentioned options:

Fetch from Source

Upload Data File

Use Existing Dataset

Whether the data is fetched from the source or uploaded, the following configuration properties remain the same:

Field

Description

Type Of Data

Input format of Data.

Max no. of Rows

Maximum no. of the rows. Sample to pull from Streaming Source.

Trigger time for Sample

Minimum wait time before system fires a query to fetch data. Example: If it is 15 Secs, then the system will first wait for 15 secs and will fetch all the rows available and create dataset out of it.

Sampling Method

Dictates how to extract sample records from the complete data fetched from the source.

Following are the ways:

-Top N: Extract top n records using limit() on dataset.

-Random Sample: Extract Random Sample records applying sample transformation and then limit to max number of rows in dataset.


Note: In case of ClickStream Data Source, you can also choose from an existing schema.

Fetch from Source

Fetch data from any data source.

Depending on the type of data, determine the Data Source you want to use, then recognize the type of data (or data format) and its corresponding delimiters.

These are four types of data formats supported by Gathr to extract schema from them; CSV, JSON, TEXT, XML, Fixed Length and Parquet.

Once you choose a data type, you can edit the schema and then configure the component as per your requirement.

Note: Whenever Fetch from Source is chosen, the sequence of tabs will be as follows:

Fetch from Source< Configuration < Schema < Add Notes.

CSV

The data that is being fetched from the source is in the CSV format. From within the CSV, the data columns that will be parsed, will be accepting the following delimiters.

Tab

, Comma

: Colon

; Semi Colon

| Pipe

The default delimiter is comma (,).

If the data is fetched from the source, after uploading a CSV/TEXT/JSON/Parquet, next tab is the Configuration, where the Data Sources are configured and accordingly the schema is generated. Then you can add notes, and save the Data Source’s configuration.

JSON

Select JSON as your Type of Data. The data that is being fetched from the source is in JSON format. The source will read the data in the format it is available.

XML

The System will fetch the incoming XML data from the source.

Select XML as your Type of Data and provide the XML XPath value.

XML XPath - It is the path of the tag of your XML data to treat as a row. For example, in this XML <books> <book><book>...</books>, the appropriate XML Xpath value for book would be /books/book.

The Default value of XML XPath is '/' which will parse the whole XML data.

Fixed Length

The System will parse the incoming fixed length data. Select 'Fixed Length' as your Type of Data and Field Length value.

The Field length value is a comma separated length of each field.

For Example:

If there are 3 fields f1,f2,f3 and their max length is 4,10 & 6 respectively.Then the field length value for this data would be 4,10,6.

Parquet

Select Parquet as your Type of data. The file that is being fetched from the source is in parquet format. The source will read the data in the format it is available.

Text

Text message parser is used to read data in any format from the source, which is not allowed when csv, json etc. parsers are selected in configuration. Data is read as text, a single column dataset.

When you select the TEXT as “Type of Data” in configuration, data from the source is read by Gathr in a single column dataset.

To parse the data into fields append the Data Source with a Custom Processor where custom logic for data parsing can be added.

Upload Data File

Upload your data either by using a CSV, TEXT, JSON, XML, Fixed Length or a Parquet file.

Once you choose either data type, you can edit the schema and then configure the component as per your requirement:

Note: Whenever you choose Upload Data File, the sequence of tabs will be as follows:

Upload Data File< Detect Schema< Configuration< Add Notes.

CSV

You can choose the data type as CSV and the data formats are as follows:

• Comma

• Tab

• Colon

• Semi Colon

• Pipe

Once you choose the CSV, the schema is uploaded.

After CSV/TEXT/JSON/XML Fixed Length is uploaded, next tab is the Detect Schema, which can be edited as per requirements. Next is the Configuration tab, where the Data Sources are configured as per the schema. Then you can add notes, and save the Data Source’s configuration.

Detect Schema

Following fields in the schema are editable:

Note: Nested JSON cannot be edited.editableSchema

Schema Name: Name of the Schema can be changed.

Column Alias: Column alias can be renamed.

Date and Timestamp: Formats of dates and timestamps can be changed. Note: Gathr has migrated from spark 2 to spark 3.0. For further details on upgrading Gathr to Spark 3.0, refer to the link shared below:

https://spark.apache.org/docs/3.0.0-preview2/sql-migration-guide.html

Data Type: Field type can be any one of the following and they are editable:

String

Integer

Long

Short

Double

Float

Byte

Boolean

Binary

Vector

JSON

When you select JSON as your type of data, the JSON is uploaded in the Detect Schema tab, where you can edit the Key-Value pair.

After the schema is finalized, the next tab is configuration of Data Source (Below mentioned are the configurations of every Data Source).

Note: In case of Hive and JDBC, the configuration tab appears before the Detect schema tab.

TEXT

The Text files can be uploaded to a source when the data available requires customized parsing. Text message parser is used to read data in any format from the source, which is not allowed when CSV, JSON, etc. parsers are selected in configuration.

To parse the data into fields append the Data Source with a Custom Processor where custom logic for data parsing can be added.

XML

The system will parse the incoming XML data from the uploaded file. Select XML as your Type of Data and provide the XML XPath of the XML tag. These tags of your XML files are reflected as a row.

For example, in this XML <books> <book><book>...</books>, the output value would be /books/book

You can also provide / in XML X path. It is generated by default.

‘/’ will parse the whole XML

Fixed Length

The system will parse the incoming fixed length data. If the field has data in continuation without delimiters, you can separate the data using Fixed Length data type.

Separate the field length (numeric unit) by comma in the field length.

Parquet

When Parquet is selected as the type of data, the file is uploaded in the schema and the fields are editable.

Note: Parquet is only available in HDFS and Native DFS Receiver.

Auto schema infers headers by comparing the first row of the file with other rows in the data set. If the first line contains only strings, and the other lines do not, auto schema assumes that the first row is a header row.

After the schema is finalized, the next tab is configuration. Once you configure the Data Source, you can add notes and save the Data Source’s configuration.

Use Existing Dataset

Along with fetching data from a source directly and uploading the data on the data source; you can also, use an existing dataset in the data source RabbitMQ and DFS.

To read more about it, Refer the “Use Existing Dataset " in the Dataset Section.

ADLS (Batch and Streaming)

Add an ADLS batch or streaming data source to create a pipeline. Click the component to configure it.

Under the Schema Type tab, select Fetch From Source or Upload Data File. Edit the schema if required and click next to configure.

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Container

Provide connection name in Azure Blob storage.

ADLS Directory Path

Provide directory path for ADLS file system.

ADD CONFIGURATIONS

User can add further configurations (Optional).

Environment Params

User can add further environment parameters. (Optional)

Provide the below fields to configure ADLS data source:

Click Next for Incremental Read option.

Note: The incremental Read option is available only for ADLS Batch.

Field

Description

Enable Incremental Read

Unchecked by default, check mark this option to enable incremental read support.

Read By

Option to read data incrementally either by choosing the File Modification Time option or Column Partition option.

Upon selecting the File Modification Time option provide the below detail:

Offset

Records with timestamp value greater than the specified datetime (in UTC) will be fetched. After each pipeline run the datetime configuration will set to the most recent timestamp value from the last fetched records. The given value should be in UTC with ISO Date format as yyyy-MM-dd'T'HH:mm:ss.SSSZZZ. Ex: 2021-12-24T13:20:54.825+0000.

Upon selecting the Column Partition option provide the below details:

Column

Select the column for incremental read. The listed columns can be integer, long, date, timestamp, decimal, etc.

Note: The selected column should have sequential, sorted (in increasing order) and unique values.

Start Value

Mention the value of reference column. Only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides three options to control data to be fetched -None, Limit By Count, and Limit by Value.

None: All the records with value of reference column greater than offset will be read.

Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Limit by Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order.


Advanced Mongo

Add an Advanced Mongo data source into your pipeline. Drag the data source to the canvas and click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File. Edit the schema if required and click next to Configure the Advanced Mongo source.

Configuring Advanced Mongo

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Database Name

Select the Mongo DB database source from which the data is to be fetched.

Collection Name

Name of the database collection that needs to be scanned should be selected.

Query

Filtering criteria option to choose between All Data or Match Query.

If Match Query option is selected, provide the below detail:

Filter/Query

Fetch filtered schema or read data from source as per the filter condition or query provided.

Note:

In Match Query, use only single quotes (‘’) where required.

Records Per Partition

Number of records to be read per partition.

Schema Updated Alert

Check the checkbox to receive alerts for any schema changes when the data is fetched from the source.

Add Configuration

Option to add further configuration.


Incremental Read in Advanced Mongo

Field

Description

Read Type

Option to fetch data from the source. Full Load, Incremental and CDC options are available to fetch data as explained below:

Full Load

Reads all the records as per the configured collection from the database during the pipeline execution.

Incremental

Reads records as per specified offset(s) or start value from the database during pipeline execution. Provide the below

Column

Select the column for incremental read. The listed columns can be integer, long, date, timestamp, decimal, etc.

Note: The selected column should have sequential, sorted (in increasing order) and unique values.

Column Includes Date

Check-mark the check-box if the column to be read is of data/stamp type.

Start Value

An offset value needs to be set for the incremental read that will be done on the selected column.

Only those column records with values greater than the offset value will be read.

Read Control Type

Provides three options to control data fetched - None, Limit by Count, and Limit by Value.

None: All the records in the reference column with values greater than the Start Value will be read.

Limit by Count: Only the mentioned number of records in the reference column with their values greater than the Start Value will be read.

Provide No. of Records.

Limit by Value: All the records in the reference column with

values greater than the Start Value but less than/equal to the Column Value that you set will be read (Column Value is inclusive).

Set the Column Value.

For None and Limit by Count, it is recommended that the table should have data in sequential and sorted (increasing) order.

CDC

Reads the records from the configured namespace or the Oplog namespace as per the specified CDC configuration during the pipeline execution.

Oplog Database

Select the Oplog Database from where the data should be read.

Oplog Collection

Select the Oplog Collection from where the data should be read.

Load From Original Collection

This option is checked by default during first time of the configuration. If the check box is unchecked, then it reads the records from Oplog collection as per the specified CDC configuration during the pipeline execution. It will get automatically disabled after pipeline is successfully executed.

If this option is unchecked, then provide the below field:

Offset

Records with timestamp value greater than the specified datetime (in UTC) will be fetched. After each pipeline run the datetime configuration will set to the most recent timestamp value from the last fetched records. The given value should be in UTC with ISO Date format as yyyy-MM-dd'T'HH:mm:ss.SSSZZZ. Ex: 2021-12-24T13:20:54.825+0000.



Configure Pre-Action in Source

Attunity

Configure Data Source

Configuring an Attunity Data Source

To add an Attunity Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File. Edit the schema if required and click next to Configure Attunity.

Configuring Attunity

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Capture

Data: Flag to capture data

Metadata: Flag to capture Metadata.

You can select both the options too.

Define Offset

This configurations is similar to what is used for Kafka offset.

Latest: The starting point of the query is just from the latest offset.

Earliest: The starting point of the query is from the starting /first offset.

Connection Retries

The number of retries for component connection. Possible values are -1, 0 or any positive number. If the value were -1 then there would be infinite retries for infinite connection.

Delay Between Connection Retries

Retry delay interval for component connection. (In milliseconds.)

Add Configuration

To add additional custom Kafka properties in key-value pairs.


Click on the add notes tab. Enter the notes in the space provided.

Configuring a Data Topics and Metadata Topic tab

Choose the topic names and their fields are populated which are editable. You can choose as many topics.

Choose the metadata topic and the topics’ fields are populated which are editable. You can only choose metadata of one Topic.

Field

Description

Topic Name

Topic name from where consumer will read the messages.

ZK ID

Zookeeper path to store the offset value at per-consumer basis. An offset is the position of the consumer in the log.


Click Done to save the configuration.

AWS IoT

AWS-IoT and Gathr allows collecting telemetry data from multiple devices and process the data.

Note: Every action we perform on Gathr is reflected on AWS-IoT Wizard and vice versa.

Configuring an AWS IoT

To add an IoT channel into your pipeline, drag the channel to the canvas and right click on it to configure.

Schema Type

Under the Schema Type tab, select Fetch From Source or Upload Data File.

Fetch from source takes you to Configuration of AWSIoT.

Upload Data File will take you to Detect Schema page.

Click on the add notes tab. Enter the notes in the space provided.

Azure Blob Batch

An Azure Blob Batch channel reads different formats of data in batch (json, csv, orc, parquet) from container. It can omit data into any emitter.

Configuring an Azure Blob Data Source

To add an Azure Blob Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, you can Upload Data File and Fetch From Source.

Field

Description

Connection Name

Select the connection name from the available list of con­nections, from where you would like to read the data.

Container

Container name in Azure Blob.

Path

End path with * in case of directory.

For example:- outdir.*.

For Absolute path:- outdir/filename

Add Configuration

To add additional custom properties in key-value pairs.


Configure Pre-Action in Source

Azure Blob Stream

An Azure Blob Stream channel reads different formats of streaming data (json, csv, orc, parquet) from container and emit data into different containers.

Configuring an Azure Blob Data Source

To add an Azure Blob Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, you can Upload Data File and Fetch From Source.

Field

Description

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Container

Container name in Azure Blob.

Path

End path with * in case of directory.

For example:- outdir/*

Expecting any specific format in directory Path:- outdir/*.csv

Add Configuration

To add additional custom properties in key-value pairs.


Batch Cosmos

On Cosmos Channel, you will be able to read from selected container of a Database in batches. You can use custom query in case of Batch Channel.

Configuring a Batch Cosmos Data Source

To add a Batch Cosmos Data Source into your pipeline, drag the Data Source to the canvas and click on it to configure.

Under the Schema Type tab, you can Upload Data File and Fetch From Source.

Field

Description

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Key

Provide the Azure Cosmos DB key. Click TEST Connection to test the execution of the connection.

Database

Select the Cosmos Database from the drop-down list.

Container

Select the Cosmos Database from the drop-down list.

Enable Change Feed

Change feed enables to read the latest records or changed records. Effect comes only during pipeline flow.

Change Feed From Beginning

If set to True, data will be read from beginning.

CosmosDB Checkpoint Directory

It is the file path where Cosmos stores the checkpoint data for Change feed.

ADD CONFIGURATION

To add additional custom properties in key-value pairs.


Configure Pre-Action in Source

Click Next for Incremental Read option

Field

Description

Enable Incremental Read

Check this checkbox to enable the incremental read support.

Column to Check

Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp and decimal type of values.

Start Value

Mention the value of reference column. Only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides 3 options to control how data will be fetched:

None, Limit by Count and Limit by Value.

None: All the records with value of reference column greater than offset will be read.

Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Limit By Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order


Delta (Batch and Streaming)

On Delta Lake Channel, you should be able to read data from delta lake table on S3, HDFS, GCS, ADLS or DBFS. Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. All data in Delta Lake is stored in Apache Parquet format. Delta Lake provides the ability to specify your schema and enforce it along with timestamps.

Configuring Delta Data Source

To add a Delta Data Source into your pipeline, drag the Data Source to the canvas and click on it to configure.

Under the Schema Type tab, you can Upload Data File and Fetch From Source. Below are the configuration details of the Delta Source (Batch and Streaming):

Field

Description

Source

Select source for reading the delta file from the available options in the drop down list: HDFS, S3, GCS, DBFS and ADLS.

Provide below fields if the user selects HDFS source for reading the data:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Username

Once the Override Credentials option is checked, provide the user name through which the Hadoop service is running.

HDFS File Path

Provide the file path of HDFS file system.

Time Travel Option

Select one of the time travel options:

- None: Option not to choose Time Travel.

- Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number.

- Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read.

Note: The Time Travel option is not available for Streaming Delta source.

Provide below fields if the user selects S3 source for reading the data:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

AWS Key Id

Provide the S3 account access key.

Secret Access Key

Provide the S3 account secret key.

Note: Once the AWS Key Id and Secret Access Key is provided, user has an option to test the connection.

S3 Protocol

Select the S3 protocol from the drop down list. Below protocols are supported for various versions when user selects S3 connection type:

- For HDP versions, S3a protocol is supported.

- For CDH versions, S3a protocol is supported.

- For Apache versions, S3n protocol is supported.

- For GCP, S3n and S3a protocol is supported.

- For Azure S3n protocol is supported. Read/Write to Mumbai and Ohio regions is not supported.

- For EMR S3, S3n, and S3a protocol is supported.

- For AWS Databricks, s3a protocol is supported.

Bucket Name

Provide the S3 bucket name.

Path

Provide the sub-directories of the bucket name on which the data is to be written.

Time Travel Option

Select one of the time travel options:

- None: Option not to choose Time Travel.

- Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number. ()

- Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read.

Note: The Time Travel option is not available for Streaming Delta source.

Provide below fields if the user selects GCS source for reading the data:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

sax.label.comp.serviceAccountKey

Upload the Service Account File.

User has an option to test connection.

Bucket Name

Provide the GCS bucket name.

Path

Provide the sub-directories of the bucket name on which the data is to be written.

Time Travel Option

Select one of the time travel options:

- None: Option not to choose Time Travel.

- Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number.

- Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read.

Note: The Time Travel option is not available for Streaming Delta source.

Provide below fields if the user selects DBFS source for reading the data:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Directory Path

Provide the DBFS parent path for check-pointing.

DBFS File Path

Provide the DBFS file path.

Provide below fields if the user selects ADLS source for reading the data:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Container Name

Provide container name for azure delta lake storage.

ADLS File Path

Provide the directory path for azure delta lake storage file system.

Time Travel Option

Select one of the time travel options:

- None: Option not to choose Time Travel.

- Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number.

- Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read.

Note: The Time Travel option is not available for Streaming Delta source.

ADD CONFIGURATION

To add additional custom properties in key-value pairs.

Environment Params

User can add further environment parameters. (Optional)


Configure Pre-Action in Source

BIGQUERY

The configuration for BIGQUERY is mentioned below:

Field

Description

Connection Name

Mention the connection name for creating connection.

Load From Big Query Table/ Load From Query Results

Choose one of the options.

Dataset Name

Upon choosing, ‘Load From Big Query Table’, the user is required to mention the dataset name.

Table Name

Upon choosing, ‘Load From Big Query Table’, mention the table name.

Project ID of Dataset

Upon choosing, ‘Load From Big Query Table’, mention the Google Cloud project ID. If not specified, the project from the service account key of connection will be used.

Columns to Fetch

Upon choosing, ‘Load From Big Query Table’, enter vale for comma separated list of columns to select.

Where Condition

Upon choosing, ‘Load From Big Query Table’, enter the where condition.

Partition Filter Condition

Upon choosing, ‘Load From Big Query Table’, enter the partition filter condition.

Load From Query Results

Upon selecting this option, the user will be required to mention, ‘Location of Datasets used in the query’.

Maximum Parallelism

Mention the maximum number of partitions to split the data into.

Add Configuration

The user can add further configurations.

Schema Results

Under schema results, select the Big Query Dataset name and Big Query Table Name.

Details

Under details the user will be able to view the:

Table of Expiration

Number of Rows

Last Modified

Data Location

Table ID

Table Size

Created

Table Schema

Table schema details can be viewed here.


Next, in the Detect Schema window, the user can set the schema as dataset by clicking on the Save As Dataset checkbox. Click Next. The user can set the Incremental Read option.

Note: The user can enable Incremental Read option if, date or integer column in our input data.

Field

Description

Enable Incremental Read

Check this checkbox to enable the incremental read support.

Column to Check

Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp and decimal type of values.

Start Value

Mention the value of reference column. Only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides 3 options to control how data will be fetched:

None, Limit by Count and Limit by Value.


Cassandra

Configure Pre-Action in Source

Field

Description

Type

The type of thing you will register.

Name

Name of the thing.

Attributes

Attributes of associated thing type.


Cassandra Data Source reads data from Cassandra cluster using specified keyspace name and table name.

Configuring Cassandra Data Source

To add a Cassandra Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, you can Upload Data File and Fetch From Source.

Note: Casandra keyspace name and table name should exist in the Cassandra cluster.

Field

Description

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Keyspace

Cassandra keyspace name.

Table Name

Table name inside the keyspace from where we read data.

Add configuration

To add additional custom properties in key-value pairs.


Configure Pre-Action in Source

CDC

The CDC Data Source processes Change Data Capture (CDC) information provided by Oracle LogMiner redo logs from Oracle 11g or 12c.

CDC Data Source processes data based on the commit number, in ascending order. To read the redo logs, CDC Data Source requires the LogMiner dictionary.

Follow the Oracle CDC Client Prerequisites, before configuring the CDC Data Source.

Oracle CDC Client Prerequisites

Before using the Oracle CDC Client Data Source, complete the following tasks:

1.    Enable LogMiner.

2.    Enable supplemental logging for the database or tables.

3.    Create a user account with the required roles and privileges.

4.    To use the dictionary in redo logs, extract the Log Miner dictionary.

5.    Install the Oracle JDBC driver.

Task 1. Enable LogMiner

LogMiner provides redo logs that summarize database activity. The Data Source uses these logs to generate records.

LogMiner requires an open database in ARCHIVELOG mode with archiving enabled. To determine the status of the database and enable LogMiner, use the following steps:

1.    Log into the database as a user with DBA privileges.

2.    Check the database logging mode:

select log_mode from v$database;


If the command returns ARCHIVELOG, you can skip to Task 2.

If the command returns NOARCHIVELOG, continue with the following steps:

3.    Shut down the database.

shutdown immediate;


4.    Start up and mount the database:

startup mount;


5.    Configure enable archiving and open the database:

alter database archivelog;


alter database open;


Task 2. Enable Supplemental Logging

To retrieve data from redo logs, LogMiner requires supplemental logging for the database or tables.

1.    To verify if supplemental logging is enabled for the database, run the following command:

SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database;


For 12c multi-tenant databases, best practice is to enable logging for the container for the tables, rather than the entire database. You can use the following command first to apply the changes to just the container:

ALTER SESSION SET CONTAINER=<pdb>;


You can enable identification key or full supplemental logging to retrieve data from redo logs. You do not need to enable both:

To enable identification key logging

You can enable identification key logging for individual tables or all tables in the database:

For individual tables

Use the following commands to enable minimal supplemental logging for the database, and then enable identification key logging for each table that you want to use:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;


For all tables

Use the following command to enable identification key logging for the entire database:

To enable full supplemental logging

You can enable full supplemental logging for individual tables or all tables in the database:

For individual tables

Use the following commands to enable minimal supplemental logging for the database, and then enable full supplemental logging for each table that you want to use:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;


For all tables

Use the following command to enable full supplemental logging for the entire database:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;


To submit the changes

ALTER SYSTEM SWITCH LOGFILE;


Task 3. Create a User Account

Create a user account to use with the Oracle CDC Client Data Source. You need the account to access the database through JDBC.

Create accounts differently based on the Oracle version that you use:

Oracle 12c multi-tenant databases

For multi-tenant Oracle 12c databases, create a common user account. Common user accounts are created in cdb$root and must use the convention: c##<name>.

1.    Log into the database as a user with DBA privileges.

2.    Create the common user account:

ALTER SESSION SET CONTAINER=cdb$root;


CREATE USER <user name> IDENTIFIED BY <password> CONTAINER=all;


GRANT create session, alter session, set container, select any dictionary, logmining, execute_catalog_role TO <username> CONTAINER=all;


ALTER SESSION SET CONTAINER=<pdb>;


GRANT select on <db>.<table> TO <user name>;


Repeat the final command for each table that you want to use.

When you configure the origin, use this user account for the JDBC credentials. Use the entire user name, including the "c##", as the JDBC user name.

Oracle 12c standard databases

For standard Oracle 12c databases, create a user account with the necessary privileges:

1.    Log into the database as a user with DBA privileges.

2.    Create the user account:

CREATE USER <user name> IDENTIFIED BY <password>;


GRANT create session, alter session, select any dictionary, logmining, execute_catalog_role TO <user name>;


GRANT select on <db>.<table> TO <user name>;


Repeat the last command for each table that you want to use.

When you configure the Data Source, use this user account for the JDBC credentials.

Oracle 11g databases

For Oracle 11g databases, create a user account with the necessary privileges:

1.    Log into the database as a user with DBA privileges.

2.    Create the user account:

CREATE USER <user name> IDENTIFIED BY <password>;


GRANT create session, alter session, select any dictionary, logmining, execute_catalog_role TO <user name>;

GRANT select on <db>.<table> TO <user name>;

GRANT select on v$logmnr_parameters to <user name>;


GRANT select on v$archived_log to <user name>;


GRANT select on <db>.<table> TO <user name>;


Repeat the final command for each table that you want to use.

When you configure the origin, use this user account for the JDBC credentials.

Task 4. Extract a Log Miner Dictionary (Redo Logs)

When using redo logs as the dictionary source, you must extract the Log Miner dictionary to the redo logs before you start the pipeline. Repeat this step periodically to ensure that the redo logs that contain the dictionary are still available.

Oracle recommends that you extract the dictionary only at off-peak hours since the extraction can consume database resources.

To extract the dictionary for Oracle 11g or 12c databases, run the following command:

EXECUTE DBMS_LOGMNR_D.BUILD(OPTIONS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);


To extract the dictionary for Oracle 12c multi-tenant databases, run the following commands:

ALTER SESSION SET CONTAINER=cdb$root;


EXECUTE DBMS_LOGMNR_D.BUILD(OPTIONS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);


Task 5. Install the Driver

The Oracle CDC Client origin connects to Oracle through JDBC. You cannot access the database until you install the required driver.

Note: JDBC Jar is mandatory for Gathr Installation and is stored in Tomcat Lib and Gathr third party folder.

The CDC Data Source processes Change Data Capture (CDC) information provided by Oracle LogMiner redo logs from Oracle 11g or 12c.

CDC Data Source processes data based on the commit number, in ascending order. To read the redo logs, CDC Data Source requires the LogMiner dictionary.

The Data Source can create records for the INSERT, UPDATE and DELETE operations for one or more tables in a database

Configuring CDC Data Source

To add a CDC Data Source into your pipeline, drag the processor to the canvas and right click on it to configure.

Source Tables:

To configure your schema, you can either use configured pluggable database or non-pluggable database.

Field

Description

Use Pluggable Database

Select the option for 12c multi-tenant databases.

Pluggable Database Name

Name of the pluggable database that contains the schema you want to use. Use only when the schema was created in a pluggable database.

Container Database Connection

Use the user account created for the Data Source. Common user accounts for Oracle.

Schema Name

Provide the schema name.

Tables

Tables on which data will be processed.


Field

Description

Operations

Operations for creating records.

INSERT

DELETE

UPDATE

Database Time Zone

Time zone of the database. When the database operates in a different time zone from Data Collector.

Maximum Transaction Time

Time in seconds to wait for changes for a transaction. Enter the longest period that you expect a transaction to require. Default is 60 seconds.

LogMiner Session Window

Time in seconds to keep a LogMiner session open. Set to larger than the maximum transaction length. Reduce when not using local buffering to reduce LogMiner resource use. Default is 7200 seconds.

Local Buffering

TRUE:-

Using local buffers, the Data Source requests the transactions for the relevant tables and period. The Data Source buffers the resulting LogMiner redo SQL statements until it verifies a commit for a transaction. After the commit, it parses and processes the committed data. The Data Source can buffer the redo SQL statements completely in memory.

FALSE:-

When using Oracle LogMiner buffers, the Data Source requests data from Oracle LogMiner for a particular period. LogMiner then buffers all transactions for that period for all the tables in the database, rather than only the tables needed by the origin.

Max Batch Size (records)

Maximum number of records processed at one time. Keep values up to the Data Collector maximum batch size. Default is 1000.

LogMiner Fetch Size

Minimum number of records to fetch before passing a batch to the pipeline. Keep this value low to allow the origin to pass records to the pipeline as soon as possible, rather than waiting for a larger number of records to become available. Lower values can increase throughput when writes to the destination system are slow. Default is 1.

Query Timeout

Time to wait before timing out a LogMiner query. Default is 300 seconds.

Start From

Start From is the point in the LogMiner redo logs where you want to start processing from. When you start the pipeline, Oracle CDC Client starts processing from the specified initial change and continues until you stop the pipeline. It is sub- categories in the following:

1. From the latest change

The origin processes all changes that happened in the pipeline after you start the pipeline.


2. From a specified date-time:

The origin processes all changes that occurred at the specified date-time and later use the format Use the format: DD-MM-YYYY HH24:mm:ss.


3. From a specified system-change number (SCN)

The origin processes all changes that occurred in the specified SCN (specified-change number) and later. When using the specified SCN, the origin starts processing with the timestamps associated with the SCN. If the SCN cannot be found in the redo logs, the origin continues reading from the next higher SCN that is available in the redo logs.


Container

The container Data Source is used to read the data from Couchbase. This Data Source is used as a caching container which read both the aggregated data and raw data from the bucket.

Configuring a Container Data Source

To add a Container Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema tab, you can Upload Data File Upload Data File and Fetch From Source.

Field

Description

Load From Template

Select the template from the list.

Only those templates will be visible in the drop down list that are created globally or within that workspace.

Sync With Source Template

When a template is saved and is edited, this tab is reflected. If any change is made in source template, corresponding changes will reflect in Container’s configuration.

Connection Name

Name of the connection

Bucket Name

Couchbase Bucket name.

Select the Couchbase bucket that gets generated with the Couchbase connection.

You can also create a bucket and fetch details in the fields.

Bucket password

Couchbase Bucket password

Memory Quota

Memory Quota in megabytes per server node. Memory Quota cannot be less than 100 MB.

Replicate view indexes

By checking the Replicate view indexes checkbox, you ensure that view indexes, as well as data are replicated

Flush

When flushed, all items in the bucket are removed as soon as possible.

Note: This does not immediately show in item count statistics with memcached buckets.

Record Limit

Enable the limit to fetch the number of records from Couchbase.

If you select it as Yes, the following tabs are populated, else, you can move to Fetch Data.

Max no. of Records

Maximum number of records to be fetched from the Couchbase bucket

Overflow Condition

Fetch the records based on order for the selected column. (Fetch minimum records for ascending order and fetch maximum records for descending order)

Note: With aggregation, select the grouping field in the Overflow condition.

Fetch Data

Raw Data:

Fetch the raw data from the Couchbase bucket.

Aggregated Data:

Fetch the aggregated data from the Couchbase. A new field is populated, where you can group the fields from the schema.

Grouping Fields

Field of selected message on which group by is applied.

Fields

Apply functions on the input and output fields.

Click on the NEXT button. An option to Save as Template will be available. Add notes in the space provided and click on Save as Template

Choose the scope of the Template (Global or Workspace) and Click Done for saving the Template and configuration details.

Configure Pre-Action in Source

Custom Channel

Custom Data Source allows you to read data from any data source.

You can write your own custom code to ingest data from any data source and build it as a custom Data Source. You can use it in your pipelines or share it with other workspace users.

How to Create Custom Code Jar

Create a jar file of your custom code and upload it in a pipeline or as a registered component utility.

To write a custom code for your custom Data Source, follow these steps:

1.    Download the Sample Project. (Available on the home page of Data Pipeline).

Import the downloaded Sample project as a maven project in Eclipse. Ensure that Apache Maven is installed on your machine and that the PATH for the same is set on the machine.

2.    Implement your custom code and build the project. To create a jar file of your code, use the following command: mvn clean install –DskipTests.

command1

For a Custom Data Source, add your custom logic in the implemented methods of the classes as mentioned below:

High-level abstraction

If you want high level, abstraction using only Java code, then extend BaseSource as shown in SampleCustomData Source class

com.yourcompany.component.ss.Data Source.SampleCustomData Source which extends BaseSource

Methods to implement:

public void init(Map<String, Object> conf)

public List<String> receive()

public void cleanup()

Low-level abstraction

If you want low-level implementation using spark API, then extend AbstractData Source as shown in SampleSparkSourceData Source class.

com.yourcompany.component.ss.Data Source.SampleSparkSourceData Source extends AbstractData Source

Methods to implement:

public void init(Map<String, Object> conf)

public Dataset<Row> getDataset(SparkSession spark)

public void cleanup()

Configuring Custom Data Source

While uploading data you can also upload a Dataset in Custom Data Source.

Field

Description

Data Source Plugin

Fully qualified name of a custom code class.


Upload the custom code jar using the Upload jar button from the pipeline designer page. You can use this Custom Data Source in any pipeline.

Step 4: Click Done to save the configuration.

Data Generator

Data Generator Data Source generates test data for testing your pipelines. Once the pipeline is tested with fields and their random data, you can replace the Data Generator Data Source.

Configuring Data Generator Data Source

To add a Data Generator Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, you can only Upload Data File to this Data Source.

Configuration tab:

Upload Data:

You can upload a file, of either JSON or CSV format to test data.

Every row from the file is treated as a single message.

Check the option repeat data, if you wish to continuously generate data from uploaded file.

Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.

Dataset Channel

This component is supported in Gathr on-premise. User have an option to utilize any existing dataset as a channel in the data pipeline.

On the Schema Type tab, update configuration for required parameters with reference to below table:

Field

Description

Select Dataset Type

Option to select a specific type of dataset (for example: HDFS, Hive, JDBC or S3) or all types of dataset to filter and narrow down the selection of required dataset in the subsequent step.

Select Dataset & Version

Select any of the existing dataset and its version that you want to use as a channel in the pipeline.

Max No of Rows

Maximum number of sample rows to pull from the Dataset Source. 100 by default.

Trigger time for Sample

Minimum wait time before system fires a query to fetch data. Example: If it is 15 Secs, then system will first wait for 15 secs and will fetch all the rows available and create dataset out of it. 3 Seconds by default.

Sampling Method

Dictates how to extract sample records from the complete data fetched from the source.


Following are the ways:


-Top N: Extract top n records using limit() on dataset.


-Random Sample: Extract Random Sample records applying sample transformation and then limit to max number of rows in dataset.


Once the Schema and Rules for the existing dataset are validated, click Next and go to the Incremental Read tab.

Incremental Read

Field

Description

Enable Incremental Read

Check this checkbox to enable incremental read sup­port.

Column to Check

Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values.

Start Value

Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides three options to control data to be fetched -None, Limit By Count, and Limit By Value.

None: All the records with value of reference column greater than offset will be read.

Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Limit By Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order.


Click NEXT to detect schema from the File Path file. Click Done to save the configuration.

Configure Pre-Action in Source.

Dummy

Dummy channel is required in cases where the pipeline has a processor and emitter that does not require a channel.

For example, in case you only want to check the business logic or the processing of data, but do not require a data source to generate data for this pipeline. Now, with StreamAnalytix it is a mandatory to have an emitter in the pipeline. In such a scenario, you can use a Dummy Channel so that you can test the processors without the requirement of generating the data using an actual channel.

GCS (Batch and Streaming)

StreamAnalytix provides batch and streaming GCS (Google Cloud Storage) channels. The configuration for GCS data source is specified below:


Field

Description

Connection Name

Select GCP connection name for establishing connection.

Bucket Name

Provide path of the file for Google storage bucket name.

Path

Provide value for the end path with * in case of directory. For e.g. outdir.*


Note:

- The user can add configuration by clicking at the ADD CONFIGURATION button.

- Next, in the Detect Schema window, the user can set the schema as dataset by clicking on the Save As Dataset checkbox.

- The Incremental Read option will be in GCS batch data source and not in the GCS Streaming channel.

Configure Pre-Action in Source

File Reader

This component is supported in Gathr on-premise. A local file data source allows you to read data from local file system. Local file System is File System where Gathr is deployed.

Note: File reader is for batch processing.

The Batch data can be fetched from source or you can upload the files.

Schema tab allows you to create a schema and the fields. On the Detect Schema tab, select a Data Source or Upload Data.

Field

Description

File Path

The local file path from where the data will be read.


Click NEXT to detect schema from the File Path file. Click Done to save the configuration.

Configure Pre-Action in Source

HDFS

This is a Batch component.

To add an HDFS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Schema Type tab allows you to create a schema and the fields. On the Detect Schema tab, select a Data Source or Upload Data.

For an HDFS Data Source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab, Is Header Included in source

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

HDFS file Path

HDFS path from where data is read. For Parquet, provide.parquet file path.


Click on the Add Notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

Configure Pre-Action in Source

Hive

To use a Hive Data Source, select the connection and specify a warehouse directory path.

Note: This is a batch component.

To add a Hive Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Detect Schema Type tab, select Fetch From Source or Upload Data File.

Configuring Hive Data Source

Field

Description

Message Type

Single: If only one type of message will arrive on the Data Source.

Multi: If more than one type of message will arrive on the Data Source.

Message Type

Select the message you want to apply configuration on.

:

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Query

Write a custom query for Hive.

Refresh Table Metadata

Spark hive caches the parquet table metadata and partition information to increase performance. It allows you to have an option to refresh table cache, to get the latest information during inspect. Also, this feature helps the most when there are multiple update and fetch events, in the inspect session.

Refresh Table option also repairs and sync partitioned values into Hive metastore. This allows to process the latest value while fetching data during inspect or run.

Table Names

User can specify single or multiple table names to be refreshed.


After the query, Describe Table and corresponding Table Metadata, Partition Information, Serialize and Reserialize Information is populated.

Make sure that the query you run matches with the schema created with Upload data or Fetch from Source.

Click Done to save the configuration.

Configure Pre-Action in Source

HTTP

Through HTTP Data Source, you can consume different URI and use that data as datasets in pipeline creation.

For example, there is a URL which returns the list of employees in JSON, TEXT or CSV format. You can consume this URL through HTTP Channel and use it as dataset for performing operations.

Configuring HTTP Data Source.

To add a HTTP into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Field

Description

URI

HTTP or HTTPS URL to fetch data.

Request Method

HTTP request method- GET, POST.

Request Body

Request body for method.

Header

Header’s parameter name and value.

Accept Type

Content type that URI accepts. For example: JSON, CSV and Text.

Auth Type

Used to specify the authorization type associated with the URL. Includes None, Basic, Token and Oauth2 as authorization types. All the types are explained below in detail.

Path to message

JSON path expression to be evaluated to get input data as per message.Works only for Fetch From source as Json


Auth Types

1. None

This option specify that URL can be accessed without any authentication.

2. Basic

This option specify that accessing URL requires Basic Authorization. Provide user name and password for accessing the URL.

User name

Enter the user name for accessing the URL.

Password

Enter the password for accessing the URl.


3. Token Based

Token-based authentication is a security technique that authenticates the users who attempts to log in to a server, a network, or other secure system, using a security token provided by the server

Token ID

Key with which token is referred in request.

Token

Token which is provided in order to access the URL.

.

4. OAuth2

Oauth2 is an authentication technique in which application gets a token that authorizes access to the user's account.

AuthURL

The endpoint for authorization server, which retrieves authorization code.

ClientId   

Client ID identifier, given to client during application registration process

Secret Key

The secret key provided to the client during application registration process.

Header   

Used to specify the headers associated with AuthUrl, through which authorization code is generated.


Configure Pre-Action in Source

Impala

This component is supported in Gathr on-premise. Under the Schema Type tab, select Fetch From Source or Upload Data File.

When fetch from source is chosen, the schema tab comes after the configuration, and when you upload data, the schema tab comes before configuration.

Configuring Impala Data Source

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Query

Write a custom query for Impala.

Fetch Size

The fetch size determines the number of rows to be fetched per round trip.


Click on the Add Notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

JDBC

JDBC Channel supports Oracle, Postgres, MYSQL, MSSQL,DB2 connections.

You can configure and test the above mentioned connection with JDBC. It allows you to extract the data from DB2 and other source into your data pipeline in batches after configuring JDBC channel.

Note: This is a batch component.

Prerequisite: Upload appropriate driver jar as per the RDBMS used in JDBC Data Source. Use the upload jar option.

For using DB2, create a successful DB2 Connection.

Configuring JDBC Data Source

To add a JDBC into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File. or use existing dataset.

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections for data ingestion.

Query

Write a custom query for JDBC

Enable Query Partitioning

Read parallel data from the running query.

No. of Partitions

Specifies no of parallel threads to be invoked to read from JDBC in spark.

Partition on Column

Partitioning column can be any column of type Integer, on which spark will perform partitioning to read data in parallel.

Lower Bound/ Upper Bound

Value of the lower bound for partitioning column/ Value of the upper bound for partitioning column


Metadata

Enter the schema and select table. You can view the Metadata of the tables.

Field

Description

Table

Select table of which you want to view Metadata.

Column Name

Name of the column generated from the table.

Column Type

Type of the column, for example: Text, Int

Nullable

If the value of the column could be Nullable or not.


Once the Metadata is selected, Click Next and detect schema to generate the output with Sample Values. The next tab is Incremental Read.

Incremental Read

Enter the schema and select table. You can view the Metadata of the tables.

Field

Description

Enable Incremental Read

Check this check-box to enable incremental read support.

Column to Check

Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values.

Start Value

Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value.


None: All the records with value of reference column greater than offset will be read.


Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order.


Click Done to save the configuration.

Configure Pre-Action in Source

Kafka

Under the Schema Type tab, select Fetch From Source or Upload Data File.

When fetch from source is chosen, the schema tab comes after the configuration, and when you upload data, the schema tab comes before configuration.

Configuring Kafka Data Source

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Topic Type

Select one of the below option to fetch the records from Kafka topic(s)

Topic name: The topic name is used to subscribe a single topic

Topic list: The topic list is used to subscribe a comma-separated list of topics

Pattern: The pattern is used to subscribe to topic values as Java regex:

With Partitions80: The topic with partitions is used for specific topic(s) partitions to consume. i.e. json string {"topicA":[0,1],"topicB":[2,4]}


Schema must be same in case of Topic List/Pattern/With partition.

Topic Name

Topic in Kafka from where messages will be read.

Topic List/ Pattern/ With partitions

A topic is category or feed name to which messages will be published

Partitions

Number of partitions. Each partition is an ordered unchangeable sequence of message that is repeatedly added to a commit log.

Replication Factor

Number of replications. Replication provides stronger durability and higher availability. For example, a topic with replication factor N can tolerate up to N-1 server failures without losing any messages committed to the log.

Define Offset

Following configurations are used for Kafka offset.

Latest: The starting point of the query is just from the latest offset.

Earliest: The starting point of the query is from the starting /first offset.

Custom: A json string specifying a starting and ending offset for each partition.

startingOffsets: A JSON string specifying a starting offset for each partition i.e. {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}

endingOffsets: A JSON string specifying a ending offset for each partition. This is an optional property with default value “latest”.i.e. {"topicA":{"0":23,"1":-1},"topicB":{"0":-1

Connection Retries

The number of retries for component connection. Possible values are -1, 0 or any positive number. If the value is -1 then there would be infinite retries for infinite connection.

Max Offset Per Trigger

Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topic Partitions of different volume.

Fail on Data Loss

Provides option of query failure in case of data loss. (For example, topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. Batch queries will always fail, if it fails to read any data from the provided offsets due to data loss

Delay Between Connection Retries

Retry delay interval for component connection (in milliseconds).

ADD CONFIGURATION

To add additional custom Kafka properties in key-value pairs.


Click on the Add Notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

Kinesis

Kinesis Data Source allows you to fetch data from Amazon Kinesis stream.

Configuring Kinesis Data Source

Under the Schema Type tab, select Upload Data File.

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Application Name

Name of the application for check pointing.

Stream Name

Name of the Kinesis stream.

Shards Count

Number of shards required to create the stream, if the stream is not already present.

EndPoint

End point is a URL that is the entry point of Kinesis services.

Region

Name of the region. For example, us-west-2

Initial Position

In the absence of Kinesis checkpoint, this is the workers initial starting position in the stream.

The values are either the beginning of stream per Kinesis limit of 24 hours or the tip of the stream.

TRIM_HORIZON: To read data from beginning of stream, use the command TRIM_HORIZON.

LATESOpenness To read latest or most recent records, use the command LATEST.

Checkpoint Interval

Checkpoint interval for Kinesis check pointing.

This allows the system to recover from failures and continue processing where the Stream left off.

Storage Level

Flag for controlling the storage.

- MEMORY_ONLY

- MEMORY_AND_DISK

- MEMORY_ONLY_SER

- MEMORY_AND_DISK_SER

- MEMORY_ONLY_2

- MEMORY_AND_DISK_2

- DISK_ONLY

- MEMORY_ONLY__SER_2

- DISK_ONLY_2

- MEMORY_AND_DISK_SER_2

Add configuration

Additional properties can be added using Add Configuration link.


Click on the Add Notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

KUDU

Apache Kudu is a column-oriented data store of the Apache Hadoop ecosystem. It enable fast analytics on fast (rapidly changing) data. The channel is engineered to take advantage of hardware and in-memory processing. It lowers query latency significantly from similar type of tools.

Configuring KUDU Data Source

To add a KUDU Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File.

Field

Description

Connection Name

Connections are the service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Table Name

Name of the table.

Add Configuration

To add additional properties in key-value pairs.


Metadata

Enter the schema and select table. You can view the Metadata of the tables.

Field

Description

Table

Select table of which you want to view Metadata.

Column Name

Name of the column generated from the table.

Column Type

Type of the column, for example: Text, Int

Nullable

If the value of the column could be Nullable or not.


Once the Metadata is selected, Click Next and detect schema to generate the output with Sample Values. The next tab is Incremental Read.

Incremental Read

Enter the schema and select table. You can view the Metadata of the tables.

Field

Description

Enable Incremental Read

Check this check-box to enable incremental read support.

Column to Check

Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values.

Start Value

Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value.

None: All the records with value of reference column greater than offset will be read.

Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order.


Click on the Add Notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

Configure Pre-Action in Source

MQTT

Mqtt Data Source reads data from Mqtt queue or topic.

Configuring Mqtt Data Source

To add a MQTT Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Sourceor Upload Data File.

Field

Description

Connection Name

Connections are the service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

QueueName/TopicName

Queue/topic name from which messages will be read.

Add Configuration

To add additional properties in key-value pairs.


Click on the Add Notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

Native DFSReceiver

This Data Source enables you to read data from HDFS. This is a streaming component.

For a NativeDFS Receiver Data Source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab that reads, Is Header Included in source.

This is to signify if the data that is fetched from source has a header or not.

If Upload Data File is chosen, then there is an added tab, which is Is Header included in Source.

All the properties are same as HDFS.

Native File Reader

This component is supported in Gathr on-premise. A Native File Reader Data Source allows you to read data from local file system. Local file System is the File System where Gathr is deployed.

Note: Native File reader is for Streaming processing.

Streaming data can be fetched from source or you can upload the files.

Detect Schema tab allows you to create a schema and the fields. On the Detect Schema tab, select a Data Source or Upload Data.

Field

Description

File Path

The local file path from where the data will be read.


Click NEXT to detect schema from the File Path file. Click Done to save the configuration.

OpenJMS

OpenJMS is a messaging standard that allows application components to create, send, receive and read messages.

OpenJMS Data Source reads data from OpenJMS queue or topic.

The JMS (Java Message Service) supports two messaging models:

1. Point to Point: The Point-to-Point message producers are called senders and the consumers are called receivers. They exchange messages by means of a destination called a queue: senders produce messages to a queue; receivers consume messages from a queue. What distinguishes the point-to-point messaging is that only one consumer can consume a message.

2. Publish and Subscribe: Each message is addressed to a specific queue, and the receiving clients extract messages from the queues established to hold their messages. If no consumers are registered to consume the messages, the queue holds them until a consumer registers to consume them.

Configuring OpenJMS Data Source

To add an OpenJMS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File.

Field

Description

Connection Name

Connections are the Service identifiers.Select the connection name from the available list of connections, from where you would like to read the data

Queue Name

Directory path of HDFS file system.


Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.

Pubsub

The configuration for Pubsub data source is mentioned below:

Field

Description

Connection Name

Select the connection name for creating connection.

Topic Name

Mention the topic name where the data will be published.

Auto-create Subscription Name

Select checkbox to auto-create subscription name. Or else; enter Subscription name.

Max Size

Enter value for number of messages to be received in one request.


Note:

The user can add further configuration by clicking at the ADD CONFIGURATION button.

The ‘Fetch from Source’ will work only if topic and subscription name exists and has some data already published in it.

Multiple Emitters to Pubsub source are supported only when auto create subscription name is enabled.


RabbitMQ

RabbitMQ Data Source reads messages from the RabbitMQ cluster using its exchanges and queues.

Configuring RabbitMQ Data Source

To add a RabbitMQ into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File.

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Exchange Name

RMQ producers can only send messages to an exchange. The exchange pushes messages to queues from where consumers can read the message.

Exchange Type

Defines the functionality of the exchange i.e., how messages are routed through it. Exchange types: DIRECT, TOPIC and FANOUT.

Exchange Durable

Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged when a server restarts.

Exchange Auto Delete

If you set this to True, the exchange is deleted when all queues finish using it.

Routing Key

Identifies a key for redirecting messages to a specific topic.

Queue Name

Name of the queue where data will be published.

Queue Durable

Durable queues remain active when a server restarts. Non- durable queues (transient queues) are purged if/when a server restarts.

Queue Auto Delete


If set, the queue is deleted when all the consumers finish using it.

Add Configuration

To add additional custom RabbitMQ properties in key-value pair.


Click on the add notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

RDS

RDS emitter allows you to write to RDS DB Engine, which could be through SSl or without ssl. RDS is Relational Database service on Cloud.

RDS Channel can read in Batch from the RDS Databases (Postgresql, MySql, Oracle, Mssql). RDS is Relational Database service on Cloud. The properties of RDS are similar to those of a JDBC Connector with one addition of SSL Security.

SSL Security can be enabled on RDS Databases.

System should be able to connect, read and write from SSL Secured RDS.

If security is enabled, it will be configured in Connection and automatically propagated to channel.

Please note: SSL Support is not available for Oracle.

Configuring RDS Data Source

To add a RDS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File:

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections for data ingestion.

Query

Hive compatible SQL query to be executed in the component.

Enable Query Partitioning

Tables will be partitioned and loaded RDDs if this check-box is enabled. This enables parallel reading of data from the table.

Number of Partitions

Specifies no of parallel threads to be invoked to read from JDBC in spark.

Partition on Column

Partitioning column can be any column of type Integer, on which spark will perform partitioning to read data in parallel.

Lower Bound/Upper Bound

Value of the lower bound for partitioning column/Value of the upper bound for partitioning column.


Metadata

Enter the schema and select table. You can view the Metadata of the tables.

Field

Description

Schema

Schema name for which the list of table will be viewed.

Table

Select table of which you want to view Metadata.


Once the Metadata is selected, Click Next and go to the Incremental Read tab.

Incremental Read

Enter the schema and select table. You can view the Metadata of the tables.

Field

Description

Enable Incremental Read

Check this checkbox to enable incremental read support.

Column to Check

Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values.

Start Value

Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value.

None: All the records with value of reference column greater than offset will be read.

Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order.


RedShift

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud.

Note: This is a batch component.

Configuring RedShift Data Source

To add a Redshift Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File.

Field

Description

Message Type

Single: If only one type of message will arrive on the Data Source.

Multi: If more than one type of message will arrive on the Data Source.

Message Name

Select the message you want to apply configuration on.


Field

Description

Connection Name

Connections are the service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Query

Write a valid query for RedShift

Enable Query Partitioning

Enable Redshift to read parallel data from a running query.

No. of Partitions

Specifies number of parallel threads to be invoked to read from RedShift in Spark.

Partition on Columns


Partitioning column is applied on column of Integer type. Spark performs partitioning to read data in parallel.

Lower Bound

Value of the lower bound for partitioning column

Upper Bound

Value of the upper bound for partitioning column


Click Done to save the configuration.

Configure Pre-Action in Source

S3

S3 data source reads objects from Amazon S3 bucket. Amazon S3 stores data as objects within resources called Buckets.

For an S3 data source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab, Header Included in source.

This is to signify if the data that is fetched from source has a header or not.

If Upload Data File is chosen, then there is an added tab, which is Is Header Included in the source. This signifies if the data uploaded is included in the source or not.

Configuring S3 Data Source

To add the S3 data source into your pipeline, drag the source to the canvas and click on it to configure.

Under the Schema Type tab, select Fetch From Sourceor Upload Data File.

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

S3 Protocol

Protocols available are S3, S3n, S3a

End Point

S3 endpoint details should be provided if the source is Dell EMC S3.

Bucket Name

Buckets are storage units used to store objects, which consists of data and meta-data that describes the data.

Path

File or directory path from where data is to be read.

Add Configuration

To add additional custom S3 properties in a key-value pair.


Click on the add notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

S3 Batch

S3 Batch Channel can read data from S3 Buckets in incremental manner. Amazon S3 stores data as objects within resources called Buckets.

On a S3 Batch Channel you will be able to read data from specified S3 Bucket with formats like json, CSV, Text, Parquet, ORC. How it helps is only the files modified after the specified time would be read.

For an S3 Data Source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab, Is Header Included in source.

This is to signify if the data that is fetched from source has a header or not.

If Upload Data File is chosen, then there is an added tab, which is Is Header included in Source. This signifies if the data is uploaded is included in source or not.

Configuring S3 Batch Data Source

To add a S3 Batch Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Configuring S3 Data Source

To add a S3 Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch From Source or Upload Data File.

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Provide AWS KeyID and Secret Access Key.

S3 Protocol

Available protocols of S3 are S3, S3n, S3a.

S3a protocol is supported by Databricks.

S3 and S3n are supported by EMR.

S3a is supported by Hadoop Version 3.x.

S3 and S3n protocol is supported by Hadoop Version below 3.x.

End Point

S3 endpoint details should be provided if the source is Dell EMC S3.

Bucket Name

Buckets are storage units used to store objects, which consists of data and meta-data that describes the data.

Path

File or directory path from where data is to be read.

Enable Incremental Read

Note: Incremental Read works during Pipeline run.

Offset

Specifies last modified time of file - all the files whose last modified time is greater that this value will be read.

Add Configuration

To add additional custom S3 properties in a key-value pair.


Click Next for Incremental Read option.

Note: The incremental Read option is available only for S3 Batch.

Field

Description

Enable Incremental Read

Unchecked by default, check mark this option to enable incremental read support.

Read By

Option to read data incrementally either by choosing the File Modification Time option or Column Partition option.

Upon selecting the File Modification Time option provide the below detail:

Offset

Records with timestamp value greater than the specified datetime (in UTC) will be fetched. After each pipeline run the datetime configuration will set to the most recent timestamp value from the last fetched records. The given value should be in UTC with ISO Date format as yyyy-MM-dd'T'HH:mm:ss.SSSZZZ. Ex: 2021-12-24T13:20:54.825+0000.

Upon selecting the Column Partition option provide the below details:

Column

Select the column for incremental read. The listed columns can be integer, long, date, timestamp, decimal, etc.

Note: The selected column should have sequential, sorted (in increasing order) and unique values.

Start Value

Mention the value of reference column. Only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides three options to control data to be fetched -None, Limit By Count, and Limit by Value.

None: All the records with value of reference column greater than offset will be read.

Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Limit by Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order.


Click on the add notes tab. Enter the notes in the space provided.

Click Done to save the configuration.

Configure Pre-Action in Source

Salesforce

Parse Salesforce data from the source itself or import a file in either data type format except Parquet. Salesforce channel allows to read Salesforce data from a Salesforce account. Salesforce is a top-notch CRM application built on the Force.com platform. It can manage all the customer interactions of an organization through different media, like phone calls, site email inquiries, communities, as well as social media. This is done by reading Salesforce object specified by Salesforce Object Query Language.

However there are a few pre-requisites to the same.

First is to create a Salesforce connection and for that you would require the following:

l A valid Salesforce accounts.

l User name of Salesforce account.

l Password of Salesforce account.

l Security token of Salesforce account.

Configuring Salesforce Data Source

To add a Salesforce Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch from Source or Upload Data File

Now, select a Salesforce connection and write a query to fetch any Salesforce object. Then provide an API version.

Field

Description

Connection Name

Select the Salesforce connection.

Query

Use the Salesforce Object Query Language (SOQL) to search your organization’s Salesforce data for specific information. SOQL is similar to the SELECT statement in the widely used Structured Query Language (SQL) but is designed specifically for Salesforce data.

Infer Schema

(Optional) Inferschema from the query results. This will find the data type of the field specified in SOQL.This will work if number of record is 5 or greater.

Bulk

(Optional) Flag to enable bulk query. This is the preferred method when loading large sets of data. Bulk API is based on REST principles and is optimized for loading large sets of data. You can use it to query many records asynchronously by submitting batches. Salesforce will process batches in the background. The Default value is false.

Salesforce Object

(Conditional) Salesforce Objects are database tables that permit you to store data specific to organization. This is mandatory parameter when bulk is true and must be same as specified in SOQL.

Pk Chunking

(Optional) Flag to enable automatic primary key chunking for bulk query job.

This splits bulk queries into separate batches that of the size defined by chunkSize option. By default, false and the default chunk size is 100,000. PK Chunking feature can automatically make large queries manageable when using the Bulk API. PK stands for Primary Key — the object’s record ID — which is always indexed. This feature is supported for all custom objects, many standard objects, and their sharing tables.

ChunkSize

The size of the number of records to include in each batch. Default value is 100,000. This option can only be used when Pk Chunking is true. Maximum size is 250,000.

Salesforce Object

Salesforce Object to be updated.

Date Format

A string that indicates the format that follows java.text.SimpleDateFormat, when reading timestamps. This applies to Timestamp Type. By default, it is null which means it tries to parse timestamp by java.sql.Timestamp.valueOf()


Configure Pre-Action in Source

SFTP

SFTP channel allows user to read data from network file system.

Configuring SFTP Data Source

Field

Description

Connection Name

Select the SFTP connection name.

File Path

Mention the file path of SFTP file system.

Note:

In case of AWS, while providing directory the user is required to provide the wildcard character ( * ) along with the directory. For eg: /home/centos/foldername/*

Incremental Read

Check mark to read latest file in case of folder.

Parallelism

Number of parallel threads to launch in order to run to download file from SFTP.


Configure Pre-Action in Source

Snowflake

The snowflake cloud-based data warehouse system can be used as a source/channel in Gathr for configuring ETL pipelines.

Configure the snowflake source by filling the below mentioned details:

Field

Description

Connection Name

The user is required to select the Snowflake connection.

Create Warehouse

The user can create a new warehouse by selecting the check-box. The user will be required to upload the file to detect schema.

Note: Upon selecting the create warehouse option, the user is required to mention the warehouse configuration details.

Warehouse Name

The user is required to mention the warehouse name.

The user can select a warehouse from the existing warehouse list displayed here.

Schema Name

Snowflake schema list of the database of the selected connection is to be mentioned here.

Note: Upon selecting schema, the metadata of the table will be displayed.

Query

The user is required to write the Snowflake SQL query.

Pre-Action Query

A semicolon separated list of SQL commands that are executed before reading data.

Post Action Query

A semicolon separated list of SQL commands that are executed after reading data.

Field

Description

Warehouse Size

User will be required to select a warehouse size.

Maximum Cluster Count

The user will be required to specify the maximum num­ber of clusters for the warehouse.

Scaling Policy

The user is required to mention the scaling policy.

Auto Suspend

The user is required to mention the value in seconds.

Auto Resume

Specifies whether to automatically resume a warehouse when a SQL statement (e.g. query) is submitted to it.

Comments

If needed, the user can mention specific comments.


Configure Pre-Action in Source

Socket

Socket Data Source allows you to consume data from a TCP data source from a pipeline. Configure Schema type and choose a Socket connection to start streaming data to a pipeline.

Configuring Socket Data Source

To add a Socket Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, select Fetch from Source or Upload Data File.

Field

Description

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Add Configuration

To add additional custom properties in key-value pairs.



Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.   

SQS

This component is supported in Gathr on-premise. SQS Channel allows you to read data from different SQS queues. The Queue types supported are Standard and FIFO. Along with providing maximum throughput you can achieve scalability using the buffered requests independently.

Configuring SQS Data Source

To add a SQS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema tab, you can select the Max No. of Rows and Sampling Method as Top-N or Random Sample.

Field

Description

Connection Name

Connection Name for creating connection.

Queue Type

Select type of the queue: Standard or FIFO.

Queue Name

Name of the queue where data will be published.

Visibility Timeout

The length of the time (in seconds) that a message received from a queue will be invisible to other receiving components.

Message Retention Period (in seconds)

The amount of time Amazon SQS will retainna message in if it does not get deleted.

Maximum Message Size (in bytes)

Maximum message size (in bytes) accepted by Amazon SQS.

Receive Message Wait Time (in seconds)

The maximum amount of time that a long polling receive call will wait for a message to become available before returning an empty house.

Delivery Delay (in seconds)

The amount of time to delay the first delivery of all messages added to this queue.


Stream Cosmos

Stream Cosmos Channel can read data in stream manner from CosmosDB (SQL API). It reads updated documents by specifying change-feed directory and using its options.

Note: Both BatchCosmos and StreamCosmos works with Local Session.

Configuring a Stream Cosmos Data Source

To add a Stream Cosmos Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, you can Upload Data File and Fetch From Source.

Field

Description

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Database

Cosmos Database.

Container

Container name in Cosmos.

Change Feed from Beginning

If set to True, data will be read from beginning.

CosmosDB Checkpoint Directory

It is the file path where Cosmos stores the checkpoint data for Change feed.

Add Configuration

To add additional custom properties in key-value pairs.


Streaming Delta

On Delta Lake Channel, you should be able to read data from delta lake table on S3, HDFS or DBFS.

Configuring a Streaming Delta Data Source.

Field

Description

Source

Mention the source name.

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Check-mark the checkbox. Provide the username through which the Hadoop service is running.

HDFS File Path

Provide file path of the HDFS file system.

S3 Protocol

S3 protocol to be used.

Bucket Name

Mention Bucket Name.

Path

Sub-directories of the bucket name mentioned above to which the data is to be written on.

Add configuration

To add additional custom properties in key-value pairs.


Vertica

This component is supported in Gathr on-premise. VERTICA Channel supports Oracle, Postgres, MYSQL, MSSQL, DB2 connections.

You can configure and connect above mentioned DB-engines with JDBC. It allows you to extract the data from DB2 and other sources into your data pipeline in batches after configuring JDBC channel.

Note: This is a batch component.

Prerequisite: Upload appropriate driver jar as per the RDBMS used in JDBC Data Source. Use the upload jar option.

For using DB2, create a successful DB2 Connection.

Configuring SQS a Vertica Data Source

To add a SQS Vertica Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Field

Description

Connection Name

Connection Name for creating connection.

Query

Hive compatible SQL query to be executed in the component.


Metadata

Enter the schema and select table. You can view the Metadata of the tables:

Field

Description

Table

Select table of which you want to view Metadata.

Column Name

Name of the column generated from the table.

Column Type

Type of the column, for example: Text, Int

Nullable

If the value of the column could be Nullable or not.


Once the Metadata is selected, Click Next and detect schema to generate the output with Sample Values. The next tab is Incremental Read.

Incremental Read

Enter the schema and select table. You can view the Metadata of the tables.


Field

Description

Enable Incremental Read

Check this check-box to enable incremental read support.

Column to Check

Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values.

Start Value

Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read.

Read Control Type

Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value.


None: All the records with value of reference column greater than offset will be read.


Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read.

Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read.

For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order.


Click Done to save the configuration.

Configure Pre-Action in Source

Tibco

This component is supported in Gathr on-premise. A TIBCO Enterprise Management Service (EMS) server provides messaging services for applications that communicate by monitoring queues.

Configuring Tibco Data Source

To add a Tibco Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema Type tab, you can Upload Data File and Fetch From Source.

Field

Description

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Messaging Model

Option to select the messaging model as either Topic or Queue.

Queue Name

The queue name should be specified.

Durable

Durable name should be specified.

Number of Consumers

Number of consumers should be specified.

Batch Size

Maximum number of records processed at one time. Keep values up to the Data Collector maximum batch size.

Connection Timeout

Connection timeout in milliseconds should be specified. 2000 by default.

Connection Retries

Maximum number of connection retries allowed should be specified. 3 by default.


Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.

VSAM

VSAM Channel reads data stored in COBOL EBCDIC and Text Format.

Configuring a COBOL Data Source

To add a VSAM Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.

Under the Schema tab, you can select the Max No. of Rows and Sampling Method as Top-N or Random Sample.

Field

Description

HDFS Connection

Container connection name for creating connection.

Data Type

Two types of data type are supported:

EBDCIC: Extended Binary Coded Decimal Interchange Code, is an 8 bit character encoding used on IBM mainframes and AS/400s.

TEXT: Text files are also accepted as data type.

Generate Record ID

Every generated record will have a primary record id corresponding to every entry.

HDFS Copy Book Path

CopyBook is Schema file for Cobol. This is where you can use the Scope Variable using @. To know more about the same, read about Scope Variable.

HDFS Data Path

The file path in HDFS from where the data will be read.


Configure Pre-Action in Source