Pipeline Management

Gathr provides tools, such as Auto Inspect, Upload Pipeline, Data Preparation, Download sample project, and Monitor, to manage pipelines at all the stages of its lifecycle.

Local Inspect

Local Inspect connection can be created at any time and from anywhere in the workspace. However, if you do not want to create a connection explicitly, the connection would be initialized automatically when you start Auto-Inspection. However, you can disconnect from it after the pipeline has been created. It is not mandatory for Local Inspect to be in initialized state for a pipeline to be edited and saved.

localExtraJava

While connecting the Local Inspect session, you can specify additional extra java options by clicking the Spark Parameters option. This parameter is optional.

In Spark Parameter, you can supply Extra Java Options for the inspect JVM process. These parameters are disabled for a superuser.

Validations

This section provides the notifications details while performing any unsupported operations in pipeline and you will be notified regarding this unsupported operation via the window shown below:

validations1

Any operation that will cause an Error, Warnings or Recommendations, will be listed under Pipeline Notifications.

Below mentioned are some of the change notifications that will be generated for the below processors:

pipelinenotification

Aggregation

Multiple aggregation operations on a Streaming Dataset are unsupported.

Distinct

Distinct is not supported to be used with Streaming Dataset.

Sort

Sorting is not supported on streaming Datasets, unless it is with aggregation in Complete output mode.

Dedup

Dedup is not supported after aggregation on a streaming Dataset.

Limit

Limits are not supported with Streaming Datasets.

Union

Union is not supported between Streaming and Batch Datasets.

Join

List of Join Operations in Spark:

left InputRight InputJoin TypeSupported or Not
StaticStaticAll TypesSupported
StreamStaticInnerSupported
left OuterSupported
Right OuterNot Supported
Full OuterNot Supported
StaticStreamInnerSupported
left OuterNot Supported
Right OuterSupported
Full OuterNot Supported
StreamStreamInnerSupported
left OuterSupported
Right OuterSupported
Full OuterNot Supported
StaticStaticAll TypesSupported

Auto Inspection

When your component is inspect ready, slide the Auto Inspect button.

Auto Inspection lets you verify the component’s data with the schema as per the configuration of the component. This happens as soon as the component is saved.

Auto inspect is done manually therefore while editing the pipeline, you can choose to not inspect the pipeline. When you edit a pipeline, auto inspect will be off by default.

While editing, you can add/remove new components in the pipeline without auto inspect.

You can auto inspect the properly configured components of the pipeline if the pipeline is not entirely configured.

AutoInsoectionInProgress

Also, the auto inspection can be stopped at any point, even while schema is being detected, and pipeline can be saved even if the inspection hangs or inspect fails.

Clicking on this button stops the auto-inspection of an existing pipeline.

stop_autoinspect

If one of the components is not configured, the auto inspect will run for the configured components only. The auto inspect window will show a notification in Red for the unconfigured component.

auroinspect1

A single component can be inspected by clicking on the eye icon on top of the components as shown below:

autoinspect1

If a pipeline is saved without inspection, the pipeline tile on the data pipeline homepage will show the warning-icon of not inspected, Once you clilck on the exclamation mark, it shows the Pipeline Notifications.

autoinspectedNOT

Once the data is loaded from the source it is automatically played through the pipeline. During auto inspect, if the schema gets changed due to any configuration changes, it will not reset the next components however it will show an exclamation mark.

This is how the corresponding configuration, logic and results from each operator is validated while the pipeline is being designed.

Clone Pipeline

Clone pipeline functionality helps you create a new pipeline from an existing pipeline by giving it a new name. Typically when configuring the pipeline for newer possibilities, it is very useful to take an existing pipeline and clone it.

Select Clone Pipeline on the pipeline tile.

A pop up window with Pipeline configuration will open.

Edit the cloned pipeline’s configuration as per requirement, under the tab Pipeline.

Under the Component tab, you can edit the component’s connection details.

Provide a unique name to your cloned pipeline as shown below:

ClonePipeline

Below shown is the Component tab:

ClonePipelineComponent

Data Preparation in Gathr

Data preparation is the process of collecting, cleaning and organizing data for analysis. Prepare the data with operations and expressions to generate customized data. Graphs and statistics are also used to present the prepared data.

To perform data preparation, configure a Data Source (For example, Kafka). Once the Data Source is configured and saved, you can run inspection on the columns of the schema that are reflected in the Data Preparation window. This allows you to build the pipeline while you are interacting with the data.

To perform Data Preparation, click on the eye icon of any component (Data Source, Processor or Emitter). A sub- section on the pipeline canvas opens with the component’s schema displayed in Data Preparation window.

In the example below, an Agriculture data file is uploaded. It holds information about crop, soil, climate, pests, season, average_yield, cost, high_production_state, crop_wastage and fertilizer.

By default, the data is displayed in Summary view with Profile Pane on the screen.

Data Pane: The schema takes the form of columns and is divided in records (as shown in the records). You can apply operations on an entry of a column or any possible combination, corresponding to which a new expression is added, which keeps creating your pipeline as in when you apply more actions.

dataprepwindow

The data preparation window has the following options:

PropertyDescription
Create Column

Create a new column by clicking on Create Column button:

Create_column

You can specify the column name, expression, and a new column will be created with the given values.

Remove Column

Remove the columns, which are no longer required. Click on the Keep/Remove button and selected columns will be kept/removed respectively.

remove_column

Profile PaneClick on the Profile Pane button. The profile pane shows distribution of data in each column. It allows to create an interactive data pipeline. The bar graphs corresponding to the values in each column will be shown in tabular format.
Data PaneClick on the Data Pane button. This pane shows the data in its original form. You can arrange the columns in ascending and descending order by clicking on the column headers.
Display Columns

Select the columns, which are to be displayed.

Reload InspectClicking on this button reloads the auto-inspect data for the selected component type.
MaximizeThis button is used to maximize the auto inspect window.
CloseThis button is used to close the auto inspect window.
Search valueSearch for a value.

Data preparation window has type2 more views: Summary View and Detailed View.

Summary View shows the summary of number of records out of total records and also the columns in each record. It also shows the name of the current schema.

The Detailed View shows details of a selected column value and its corresponding rows.

Detailed View has eight sections. All the sections are displayed and explained below:

DataPrep-Column

NumberFieldDescription
1Data Type of the Column

Shows the data type of the column.

Each data type is denoted uniquely by numbers-123 for Numeric, abc for String.

2Unique ValueThis represents the count of distinct values of the column across all the records.
3SortThis option allows you to sort all the records on basis of a specific column by Count (Ascending or Descending) and Sort by Domain (A to Z or Z to A).
4OperationsOperations that can be performed on the respective column of the schema.
5Column NameName of the column. It is an editable field.
6Search ValueAllows you to search for a record.
7ScrubberCorresponding to the unique values of a column is a scrubber. The scrubber is only available when there are more than 10 unique values. The scrubber’s height can only accommo¬date 50 records therefore, if the records are above 50 then, it shows the highest value in a batch.
8RecordsAll the values/records of the column are listed.

An important and elaborate section amongst the ones mentioned in the table above is Operations.

Operations

This option is at the top right of the column. The gear icon, which opens a sliding window, shows a list of operations (shown in image below) that can be performed on the respective column of the schema.

Operations

Whenever any operation is applied on a column (s), a step is created and reflected in the left section of Data Preparation Window. Also, a processor is added to the pipeline, depending on the operation applied.

These data preparation processors are defined below in the Data Preparation Processors section.

a. Filter

Filter the values based on a set of criteria (shown in image below) such as, Equals, Contains, Regex match and so on. Custom filter can be used for a custom value.

filter_number_operations

b. Transforms

Filter the values based on a set of criteria (shown in image below) such as, Equals, Contains, Regex match and so on. Custom filter can be used for a custom value.

Transform

c. Missing Value Replacement:

Replace the missing or null values with either Literal or Expression value.

missingvalue

Literal: Replaces Null and/or empty string values with a specified string literal.

Expression: Replaces Null and/or empty string values with a specified expression value.   

d. Analyze

Under Analyze, you can view the Null values of the selected column.

Analye

Under Statistics, you can view the mathematical statistic/value of the entire column in the form of:

  • Minimum

  • Maximum

  • Mean

  • Median

  • Standard Deviation

  • Mode

  • Distinct

  • Sum

  • Range

e. Pivot

Pivot operation is a kind of aggregation where type1 (or more) of the grouping columns has its distinct values transposed into separate columns.

Original Data

ABCD
appletype 1small1
appletype 1large2
appletype 1large2
appletype 2small3
appletype 2small3
bananatype 1large4
bananatype 1small5
bananatype 2small6
bananatype 2large7

Pivot Operation in Gathr

  • Pivot column – C

  • Grouping fields – A, B

  • Pivot values – small, large

  • Type of aggregation – Sum

  • Aggregation Column - D

Pivoted Data:

ABlargesmall
appletype 2null6
bananatype 276
appletype 141
bananatype 145

g. Group By

You can apply aggregation on a column using Group By operation and save the output in a field with or without grouped columns.

groupBy

h. Rename Column

Rename the column name.

i. Create New Column

Create a new column using this filter.

CreateNewColumn

j. Remove Column

Remove the selected column.

Data Preparation Processors

Data Preparation operations add following processors in your data pipeline:

  • Expression Filter

  • Expression Evaluator

  • Rename

  • Select

  • Drop

DataPrep-Operations1

Data Preparation steps appear on left side of inspect window, as a sliding panel.

The data preparation operations when performed keep getting added to the data pipeline and are reflected in data preparation steps.

Even when data preparation steps are added manually in a pipeline, they are reflected in data preparation steps.

Expression Filter:

Expression Filter enables filter operations on the data column.

This processor gets automatically added on the canvas once you apply the filter operation.

For example, let us add the filter criteria as “crop” equals “cereals”.

ExpressFilter

FieldDescription
VALIDATEValidates the expression specified.
UPLOADExpressions can be uploaded using this option.
ADD EXPRESSION

Additional expressions can be added using ADD EXPRESSION button.

For example, if you want to filter out “wheat” from the given crops, you can write the following expression:

crop <=> ‘wheat’

validateExpressionFilter

Expression Evaluator

Expression Evaluator is used when you want to perform transform operations on the data columns. This processor gets automatically added on the canvas once perform any transform operation on the data columns.

FieldDescription
VALIDATEValidates the transform expression corresponding to the operation applied.
UPLOADUser can upload expression file.
ADD EXPRESSION

Additional expressions can be added using ADD EXPRESSION button.

For example, write below expression to change the column name Soil to uppercase.

Expression: Upper(soil)

Scheduling

To execute a workflow periodically, click on the Scheduling tab and select any of the following option to schedule.

To access the Scheduling Tab, go to the Data Pipeline home page and click on the ellipsis.

The user can schedule a Batch Data Pipeline in fixed intervals through the Scheduling option. The pipeline can be scheduled by using two options i.e., Normal Scheduler and Cron Scheduler

Normal Scheduler

To schedule a pipeline using the Normal Scheduler option, follow the below steps:

FieldDescription
Schedule

User can schedule the pipeline for Days, Hours or Minutes.

Days: The user can specify the number of days for which the data pipeline scheduling can be set.

Hours: The user can specify the number of hours for which the data pipeline scheduling can be set.

Minutes: The user can specify the number of minutes for which the data pipeline scheduling can be set.

Number of DaysThe user can specify the number of days for which the data pipeline scheduling can be set.
Repeat CountThe user can specify the number of times a pipeline has to be repeated.

Cron Scheduler

The user can schedule a pipeline for the below mentioned time frames:

  • Minutes

  • Hourly

  • Daily

  • Monthly

  • Yearly

Scheduling_Options

If Monthly is selected, then you can choose from within the options shown below:

SCHEDULE/UN-SCHEDULE: This action will Schedule/un-schedule any scheduled workflow/pipeline and mark all the coordinated actions as terminated that are in running or waiting state.

The History of the pipeline run can be viewed under the History tab:

FieldDescription
ValidateValidates the transform expression corresponding to the operation applied.
ADD EXPRESSIONAdditional expressions can be added using ADD EXPRESSION button.

Auto Inspect Subsequent Sections

Whenever a column is either modified or removed in a component, a message will appear asking whether you want to auto-update subsequent components or not. When you click on Yes, the pipeline auto-updates the subsequent components and will stale all those components that will get impacted by the modification or removal of a particular column from the previous component.

As shown, Expression Evaluator is being updated:

AutoUpdateComponentMessage

A Warning will be displayed for the components that fail auto-update operation:

Auto_Update_WAR

Remove Stale Configuration

In a Join Processor, if any field is removed from the source dataset, then that field will be marked in Red. Also in case the same field is used in any condition.

You can remove all red columns by clicking REMOVE STALE CONFIGURATIONS and in the projection pane of join processor.

joinCondition1

removestale

removestale22

Auto-connect Components

Drag, drop and connect a component with auto-connect feature.

Drag a component near an existing valid component, and as the dragged component gets closer to the available component on canvas, they are highlighted. Drop the component and it will be connected to the data pipeline.

For example: In the following image, RabbitMQ component was dragged near S3 and S3 is highlighted. Once RabbitMQ is dropped near S3, both the components will get auto-connected.

data_pipeline

Version Control

Version control allows you to create a version of a data pipeline.

Gathr supports version control system with Gathr Metastore.

The pipeline versions will get saved in underlying database of Gathr.

A Working Copy is the current version of the copy or the first (0) version of the copy.

The pipeline editing is performed on the working copy, always.

To create a version:

First way is when you re-save an edited pipeline, you get an option to Create Version.

createdefinition

Second way to enable version on your pipelines is when you edit and save the pipeline. Amongst the option of saving the pipeline is Create Version and Switch Version button.

switchandcreteversion

Third option to create a version is by the pipeline tile.

ActionsonPipeline

For option 2nd and 3rd, the window show below opens.

createversion

Mention a description or a comment for the version and click on Create Version. Once you save the version, the last version on the pipeline changes from ‘Working Copy’ to Version 1.

When user selects any one of the options to create the Pipeline versions, there will be a notification displayed stating “Version added in Queue for creation” followed by “Version creation completed”.

A pipeline remains a working copy as long as no other version is created. Once you save a version, next to the title ‘Create Subsystem version’, is the notification of the next version that will be created.

Switch Version

If you want to switch the version of a pipeline, click on Switch Version on the pipeline editor page and choose a version. It will change the pipeline as per the selected version.

switchVersion

It is the Working Copy that is loaded to a newer version. Editing is always performed on the Working Copy of the pipeline.

Download Version

Download Version from pipeline tile allows you to download a version of the pipeline.

versionList

Click on the Download arrow to download the pipeline.

PropertyDescription
VersionVersion number of the pipeline.
Commit MessageCommit message description.
ArtifactsThe artifact uploaded.
Commit TimeThe commit time that the pipeline was committed as a version.
DownloadDownload the pipeline.

Download/Upload Pipeline

Download Pipeline

A pipeline can be downloaded along with its component’s configurations such as Schema, Alert configuration, and Registered entities.

Upload Pipeline

A pipeline can be uploaded with its component configurations such as schema, alert definitions and registered entities.​

To upload a pipeline, click on the Upload Pipeline button on the Data Pipeline-home page.

upload pipe

If there is any duplicity in the pipeline components, a confirmation page will appearas shown below:

overwriter button

Click on Create Version button to see the Exported Pipeline Details page that provides the details of the pipeline and the environment from where it was downloaded.

Click on Overwrite button and the existing pipeline will be overwritten with the current configuration of the pipeline, which was uploaded.

To create a new pipeline, click New Pipeline.

exported

Click on Proceed, and update the pipeline configuration details. Once the configuration is completed, upload the pipeline and it will get listed on the Pipeline page.

Top