Create a Workflow

To create a workflow do as follows:

Go to Workflow option in your Project and click on Create New Workflow button.

HomepageAirflow

You will be redirected to Workflow Definitionpage.

A Workflow tile has the following fields:

FieldDescription
Creation DateThe date when the workflow was created.
SubworkflowIf a workflow has a subworkflow in it, this field will show the subworkflow’s count.

This is also called parent child marker, that shows; if a workflow has zero subworkflows then it is a child subworkflow.

You can click on the number of workflows, and it will show the name of the subworkflow and its creation date.
PipelineIf a pipeline is used in the workflow, this field shows the number of pipelines used.

You can click on the number of pipelines, and it will show the name of the pipelines and their creation date.
TimezoneThe timezone in which the workflow is scheduled.
Last Test StatusIf the workflow is tested before scheduling it, then this property shows the status of that Test; whether it was successful or not.
ScheduleOnce workflow is created, deploy the workflow to Airflow by clicking on SCHEDULEbutton.
TestTest a workflow before deploying it on Airflow.
EditEdit a workflow.
DeleteDelete the workflow.

Define a new Workflow here.

pipeline_definition1

This page has nodes to create and execute the workflow. They are explained below:

Nodes

To define a workflow, four nodes are available:

  • Control Nodes

  • Pipelines

  • Actions

  • Workflows

Add Control node with one or multiple pipelines, with actions applied on it. Save your workflow. Once the Workflows are saved, you can also concatenate Workflows from the Workflow tab.

Control Nodes

Two types of control nodes are available:

controlnodes

FieldDescription
StartThis node is mandatory for defining a workflow. This node is used to represent logical start of a workflow. You can only use one start node in a workflow.
DummyThis node controls the flow of the workflow, based on trigger condition or group tasks defined in a workflow.

Pipelines

Batch Pipelines created in the workspace are reflected here.

pipelines

Actions

Action nodes are available to provide functionality of the following actions to a workflow.

Following are the Actions available under Action node:

Each action node is explained below:

Assignment Operator

This operator is used to assign workflow level variable and its value.

You can define multiple variables and their values by clicking on + ADD VARIABLE button. Make sure the values of Variable are of python data type.

FieldDescription
Variable NameProvide variable name.
Variable ValueUser can provide number, decimal and string variable values.
Use as Scope Variable(s)By selecting the ‘Use as Scope variable(s)’ option as True, the above defined variables can be used as Scope variables.

RetriesNumber of retries.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)
DurationThe duration decided for the SLA.
ValueValue of the SLA duration.

BASH Operator

This node is used to execute bash script, command or set of commands. Following are the configuration properties of Bash Operator.

FieldDescription
CommandCommand, set of commands or reference to bash script that is to be executed.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)
DurationThe duration decided for the SLA.
ValueValue of the Duration.

Email

This node is used for sending email. The details provided are redirected to the mail server details given in the Airflow configuration during installation.

Set the following configurations for this node.

FieldDescription
ToProvide mail ids for To.
CCProvide mail ids for CC.
BCCProvide mail ids for BCC.
SubjectEmail subject.
BodyEmail body.
SLAProvide SLA related details. (if required)
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.

HDFS Sensor

HDFS Sensor is used to check whether the given location (of the file or folder path) lands on HDFS (HDFS Sensor). If the sensor finds a given location on HDFS in the given time interval, then it will be considered successful otherwise failed.

Following are the configurations for this node.

FieldDescription
Connection IDSelect connection ID from drop down which is defined in airflow to connect with required HDFS.
Time Out IntervalMaximum time for which the sensor will check a given location (in seconds) once the node is triggered.
Poke IntervalTime interval for which the sensor will wait between each attempt of connection (in seconds).
Directory or File PathHDFS location. It can be a directory or file.
File SizeProvide File size (in MBs). Sensor will wait till the given file size to reach up to given size.
RetriesNumber of times a workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

In the above configuration, once HDFSSensor is triggered, it will keep checking location(/sample/file/path) in every 20 seconds for duration of 300 seconds.

HTTP Operator

This operator is used to hit an end point over HTTP system.

Following are the properties under the configuration:

FieldDescription
Connection IDSelect connection ID from the drop-down list which is defined in airflow to connect with required HTTP.

HTTP MethodSelect one of the HTTP Methods. The available options are POST and GET.

POST carries request parameter in message body while GET carries request parameter appended in URL string where the user gets a response based on the information provided in the payload/REST parameter.
End PointCalls an endpoint on an HTTP system to execute an action.

Example: Gathr/service/connections/rabbitmq/connection/Default/test
Request DataFor a POST or PUT request, The data is the content-type parameter and for GET a dictionary of key/value string pairs is accepted. However, the data is to be passed in JSON Format.



Example for GET:

https://10.xx.xx.238:xxx/Gathr/datafabric/func/list?projectName=XXX_1000068&projectVersion=1&projectDisplayName=xxxx&pageView=pipeline



Example for POST:

{“connectionName”:“JDBC_MYSQL_AUTO”,“connectionId”:“xxxxx”,“componentType”:“jdbc”,“tenantId”:“xxx”,“connectionJson”:"{“databaseType”:“mysql”,“driver”:"",“connectionURL”:"",“driverType”:"",“databaseName”:“xxx”,“host”:“xxx”,“port”:“xxx”,“username”:“xxxx”,“password”:xxxxx"}
HeaderHTTP headers to be added to the request (in JSON Format).

Header is used to authenticate requests on the server.

Example:

Header: {“XXX”:“Content-Type”: “application/json”, “token”: “xxx”}
RetriesNumber of times workflow tries to run the task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Kafka Alert Operator

Kafka alert operator is use to send alert/message to a Kafka topic.

FieldDescription
Kafka BrokersEnter the list of Kafka brokers along with port.
Kafka TopicEnter a kafka topic on which alert/message should be sent.
MessageEnter the message that needs to be sent.
SecuritySelect the Kafka security protocol from SASL and SSL.



If SSL is selected, additional configuration: CA File, CERT file and Key File is required.



If SASL is selected, configuration from Username to SASL mechanism are populated, as shown below.
CA FileFilename of CA file to be used in certification verification.
Cert FileFilename of the file in pem format that contains the client certificate, as well as a CA certificate.
Key FileFilename that contains the client private key.
UsernameProvide Kafka Username.
PasswordProvide Kafka password.
Security ProtocolProvide SASL security protocol.
SASL MechanismProvide SASL mechanism.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

MSSQL Operator

This operator use to execute SQL statements on a Microsoft SQL database.

FieldDescription
Connection IDSelect connection ID from drop-down which is defined in Airflow to connect with required MSSQL database.
QuerySQL queries used to perform operations.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

MySQL Operator

This operator is used to execute SQL statement on a MySQL database.

FieldDescription
Connection IDSelect connection ID from drop-down which is defined in Airflow to connect with required MYSQL server.
QuerySQL queries used to perform operations.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Pause Operator

The Pause Operator is used to Pause the current workflow. Provide the below configuration details to configure Pause Operator:

FieldDescription
RetriesProvide value for number of retries.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Pipeline Operator

Pipeline operator is used to run selected pipeline. You can select pipelines that needs to run. Function of this operator is same as that of a Pipeline Node.

You have to set the following configurations for this operator.

FieldDescription
Pipeline To RunWrite custom python code here that would be execute by workflow.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

POSTGRES Operator

This operator is used to execute SQL statement on a PostgreSQL database.

FieldDescription
Connection IDSelect connection ID from drop down which is defined in airflow to connect with required Postgres database.
QuerySQL queries used to perform operations.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Custom Python Operator

This operator allows you to execute custom logic/code in a workflow. You can write custom code in python language and it will be execute by workflow.

Write custom code in a python method and provide method name that should be invoked by workflow. In addition, you can also get, set or update workflow variable in custom logic.

Get variable value:

get_dag_variable(variable_name, variable_type=None)

This method is used to get workflow variable. Arguments are:

variable_name: variable name

variable_type (optional): provide variable type

Set variable value:

set_dag_variable(variable_name, variable_value)

This method is used to set/update workflow variable. Arguments are:

variable_name: provide variable name

variable_value: provide variable value

FieldDescription
Python CodeWrite the custom Python code.
ArgumentArgument to pass for Python code in kwargs, by using kwargs_val as key you get the value in Python code.

Default value is ‘None’.

For Example: value = kwargs.get(‘kwargs_value’)
Method NameProvide method name that would be invoke by workflow to execute custom code.

Method should only take keyword arguments.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

SFTP Operator

This operator is used for transferring files from remote host to local or vice a versa

FieldDescription
Connection IDSelect connection ID from drop down which is defined in airflow to connect with required SFTP sever.
Local File PathLocal file path to GET or PUT.
Remote File PathRemote file path to GET or PUT.
OperationSpecify operation ‘get’ or ‘put’, defaults is put.
CreateSelect as True to create intermediate directories.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

SFTP Sensor

SFTP Sensor is used to check if a given location (file or folder path) is landed on SFTP or not. If sensor finds given location on SFTP in the given interval, then it will considered successful, otherwise failed.

FieldDescription
Connection IDSelect connection ID from drop down which is defined in airflow to connect with required SFTP.
Time Out IntervalMaximum time for which the sensor will check the given location (in seconds) once triggered.
Poke IntervalTime interval, that sensor will wait between each tries (in seconds)
Directory or File PathSFTP location. It can be a directory or file.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

SQL Sensor

SQLSensor runs the SQL statement until first cell is in (0,’0’,’’).

It runs the SQL statement after each poke interval until Time-Out interval.

FieldDescription
Connection IDSelect Connection ID from drop down which is defined in airflow to connect with required SQL.
Time Out IntervalMaximum time for which the sensor will check the given location (in seconds) once triggered.
Poke IntervalTime interval, that sensor will wait between each tries (in seconds).
Directory or File PathSFTP location. It can be a directory or a file.
RetriesNumber of times the workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

SSH Operator

SSH operator is use to run commands over remote machine. For that user need to provide following.

FieldDescription
Connection IDSelect connection ID from the drop-down which is defined in airflow to connect with required remote machine.
CommandCommand, set of commands or reference to bash script that to be execute
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Time Delta Sensor

Waits for a given amount of time before succeeding. User needs to provide configurations.

FieldDescription
Time Out IntervalMaximum time till when sensor will wait.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Time Window Operator

Time window operator is use to check if current execution is in given time window or not. It also checks if current execution date is in calendar holiday or not. If current execution time is not in given time window or current execution date is in calendar holiday then operator returns False and consider as failed.

FieldDescription
From TimeProvide from time of time window in format HH:MM:SS
To TimeProvide from time of time window in format HH:MM:SS
Check For HolidaySelect as True to check, if current execution date is in calendar holiday. When you select TRUE, additional configuration is populated.
CalendarSelect a calender for holidays. These are the calendars created from Register Entities > Calendars → section.
Include WeekendSelect as True if you want to include weekend in holidays.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Vertica Operator

This operator is used to execute SQL statement on a Vertica database.

FieldDescription
Connection IDSelect connection ID from drop-down which is defined in airflow to connect with required Vertica database.
QuerySQL Query that is used to perform operation.
RetriesNumber of times workflow tries to run this task in case of failure.
Trigger RuleRule to define trigger condition of this node.
SLAProvide SLA related details. (if required)

Workflow

All the Workflows that are created in Gathr for the logged in user are listed on the Workflow home page.

You can add workflow as an operator inside a workflow (similar to pipeline). This workflow will be act as sub-workflow and a separate instance of workflow and will be executed as sub-workflow.

workflow

Workflow Configuration

Create Workflow

Once a workflow is defined, provide a workflow name and click on Save Workflow button to create a workflow using another workflow. (Workflow Tile).

workflowDefinition

Once you have defined and created a workflow, following operations user can perform on workflow:

Edit Workflow

You can edit a workflow, however not when it is active.

Subworkflow

A workflow can have another workflow in it. This enables the Parent Child marker in the workflows.

A Parent Marker will be shown with a Parent Workflow marker icon on the workflow, similarly for a Child Workflow marker.

On every tile, a count of no. of pipelines and subworkflow is shown upto level 1.

Which means, that if a Workflow has a 2 subworkflows, you will be able to see their names on the tiles. However, if there are more subworkflows in it or pipelines, then you will have to keep clicking and expand them on the workflow definition page. For example, you can view the three subworkflows under t2, but to check the t3, you need to click on it and view the edit mode of the subworkflow.

When a user clicks on the pipeline/workflow count, the pipeline names and creation date appear on the popup. On clicking on an individual pipeline/workflow you will be redirected to its edit view.

In the edit view, you will be able to drill down every workflow till its last level. Double click on any workflow to view the window shown below:

Schedule Workflow

Once workflow is created, deploy the workflow by clicking on SCHEDULEbutton. Set your workflow scheduling configuration as defined below:

WF1

FieldDescription
Start DateDate from which workflow starts.
End DateDate on which workflow ends. (not mandatory)
Time ZoneTime Zone in which workflow should be scheduled.
FrequencyRuns workflow after this interval. You can select a predefined intervals. If the user opts for None option for scheduled interval, he can Trigger Workflow.
Cron ExpressionCron expression is generated based on frequency.
Check For HolidaysSelect as True to check, if current execution date of workflow is in given calendar holiday.

If current execution date is in calendar holiday then that execution of workflow will not happen. If selected as True then following additional configurations are populated: Calendar and Include Weekend.
CalendarSelect the calendar that you created in the Calendars → section.
Include WeekendSelect True or False, if you want to include the weekend as a holiday or not.
Number of RetriesNumber of times Airflow restarts workflow (in case of failure)

Retry Delay: 

If number of retries is greater than 1 or above, then provide a value in Retry Delay (in seconds), which enables the workflow to try restarting, after a certain time of delay.

Email on Retry:

You can send an email if the workflow retry is attempted, by setting its value to True or not by using False.
Email on FailureAn email will be sent on failure of the workflow, if the value is set to True. the default value is False.
EmailProvide mail ids. If Email on Retry or Email on Failure is set to True, it sends an email to a given mail id.

If the workflow fails, Airflow will retry to run workflow and accordingly the mail will be sent.
Depends on PastSet to True, if current run of workflow depends upon its last run. Default is False
Wait For DownstreamIn case of True: A scheduled run will only be executed if a previously scheduled downstream task is complete. In case the downstream task has failed the scheduled task will wait unless the downstream task is complete.

In case of False: The scheduled task will run irrespective of the downstream task.
Max Active RunsOption to specify the number of instances that can run concurrently for this Workflow.
RFCUser can use this property as environment variable in bash operator i.e., $RFC.
CheckpointIDUser can use this property as environment variable in bash operator i.e., $CheckpointID.

When the user selects Schedule Interval as None then the Workflow can be Triggered as shown above.

Un-schedule Workflow

To remove a workflow from a schedule, click on the SCHEDULING option. This option is available on the Workflow tile under the menu button.

A new window will pop up and you can un-schedule the workflow.

You can always Reschedule the workflow, using the same steps.

Start Workflow

After scheduling workflow, status of the workflow will change to SCHEDULED. Now, you can start the workflow by clicking on START button. If the workflow starts successfully, the status will change to ACTIVE.

View Summary

Workflow monitoring feature allows you to:

  1. View the workflow run history.

  2. View the component-level status and its logs details.

  3. Re-trigger a failed task.

  4. Re-trigger all the failed tasks with a single click.

The view summary page will take you to the dashboard of workflow monitoring where you can see the last 5 runs and their details.

FieldDescription
Run IDTThe run ID defines the current Workflow.
StatusThe current position of the workflow.
Start TimeThe planned time for starting the workflow.
End TimeThe planned time for ending the workflow.

Upon clicking the Run ID, user can view the workflow summary. If the workflow fails, monitor window will help you resume and run the workflow from the failed stage. Shown below is a Monitor window with a failed task. Right click on the pipeline to view the logs and re-trigger in case of a failed task, as shown in the below images respectively.

As shown above within the Workflow summary DAG window you can perform the below functions:

FieldDescription
ReloadThe reload tab lets the user refresh and reload the status to verify the workflow running stage.
View LogsUser can find the root cause of the failure by right clicking on the view log tab.
Re-TriggerThe user can re-trigger a failed task by clicking on it.
Re-Trigger Failed TaskIn case of a task failure, the user can re-trigger it all failed tasks by clicking on it.

Pause Workflow

Once the workflow is in ACTIVE state, you can PAUSE the workflow.

Resume Workflow

You can RESUME a paused workflow.

Delete Workflow

Click on Delete button, to delete the workflow.

Test Workflow

You can test the workflow before it is scheduled.

Once you click on the Test button as shown below, a pop-up with testing configuration will appear. Details of each field is explained below:

FieldDescription
Time ZoneTime zone in which workflow will be scheduled.
Check For HolidayCheck if the test is scheduled on a holiday or not.
CalendarSelect the calendar as per which the Holiday schedule will be verified.
Include WeekendIf you want to include weekend as holidays or not.
Number of RetriesNumber of times Airflow should try to start workflow (if fails).
Retry DelayIf the number of retries is greater than 0, for example, 10 second; Airflow will restart the workflow.
Email on RetryIf the value is set to True, an email is sent as soon as the workflow starts. The default value is false.
Email on FailureIf the value is set to True, an email is sent as soon as the workflow starts and fails. The default value is false.
EmailProvide mail ids, if email on retry or email on failure set to true. It will send mail to given mail ids if workflow is failed or Airflow retry to run workflow

Here workflow will consider current date/time (of given time zone) as start date. Once you click on Test button, workflow will be deployed and is instantly scheduled to start.

During testing, Workflow runs only once. You will not be able to perform other operations like Start/Stop/Schedule/Resume. Once workflow testing is complete, the workflow will be ready to be Scheduled.

Once testing is done, test status will be visible on Workflow Tile. It shows whether workflow is succeed or failed.

Connections

You can create connections in Airflow from Gathr. After clicking on + sign in workflow creation page (Create Connection Icon), a pop-up will appear (Create Connection). Provide information required to create a connection in this tile.

connection

Following types of connection can be created:

  • HDFS

  • HTTP

  • MSSQL

  • MySQL

  • Postgres

  • SFTP

  • SSH

  • Vertica

Top