Processors are built-in operators for processing the data by performing various transformations and analytical operations. Each processor is responsible for performing a specific action. You can drag and drop multiple processors and stitch them together to implement the business logic in a pipeline.
AdvancedSort processor sorts the output fields before further processing the data pipeline. Each field can be then sorted in ascending or descending order. The user can specify multiple sort criteria on multiple fields to achieve secondary/tertiary and n-level sorting on input data.
Configuring AdvancedSort processor for Spark pipelines
Here, Column Order can be selected and corresponding to the same is the Sort Order, which can be set to either ascending and descending.
The output of this processor will arrange the column data in the selected sort order.
Alert processor notifies users about occurrence of a specific event during pipeline execution as configured by the user.
Configuring Alert processor for Spark pipelines
To add an Alert processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Click on the NEXT button. Specify the Alert criteria here.
Click on the NEXT button. Enter the notes in the space provided. Click Save for saving the configuration details.
To view generated Alerts, go to your Workspace > Alerts > Information.
The bell icon on top right of the page shows notification of alerts generated. For example, if you see 1 next to bell icon, it indicates 1 alert is generated.
If you click on the bell icon, you can view the alerts generated. The system will display last twenty alerts at a time.
Refresh Interval: The time interval after which you want to refresh or reload the page. For example, if refresh interval is 5 minutes, page will be refreshed after 5 minutes.
The following two types of actions can be performed on the Alerts.
1 . View Description: The Alert description which you entered at the time of creation of Alert can be viewed by clicking on the eye icon.
2 . View Message Source: The source record for which alert is generated can be viewed by clicking on the View Message Source icon.
The aggregation processor is used for performing the operations like min, max, average, sum and count over the streaming data.
For example, you have an IOT device, which generates an event comprising of three fields: id, time and action.
You wish to calculate how many actions of each type happened in a window of 15 seconds duration. You can make use of Aggregation (COUNT) operation for calculating the number of actions of each type from total events received in a window of 15 seconds duration.
The following table lists the functions supported by Aggregation processor.
Configuring an Aggregation processor for Spark pipelines:
To add an Aggregation processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Click on the NEXT button. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
The App ID Generator processor will add a new column in the dataset with value as ddmmyyyy eg 06012021.
Configuring App ID Generator processor for Spark pipelines
To add processor into your pipeline, drag the processor to the canvas and right click on it to configure. Name of the column where the generated application ID will be stored. If the provided column name already exists, then the column values will be reassigned with generated application ID.
The user can add further configurations by clicking at the ADD CONFIGURATION button by entering value. Click Next to proceed further.
Configuring the Cache for Spark pipelines.
To add the processor into your pipeline, drag the processor to the canvas and right click on it to configure.
The user can add further configurations by clicking on the ADD CONFIGURATION button. Click Next, to proceed further.
Note: Cache cannot be performed over streaming data.
It is a processor which enables you to create a custom business logic. It is used for components which are not inbuilt within Gathr.
Implements the ‘com.Gathr.framework.api.processor.JSONProcessor ‘interface and provide custom business logic in implemented methods.
You will be able to perform operations on datasets which are not supported by other processors.
To use a custom processor, you need to create a jar file containing your custom code. Then upload the jar file in a pipeline or as a registered component.
To write a custom logic for your custom Data Source, download the sample project.
Import the downloaded sample project as a maven project in Eclipse. Ensure that Apache Maven is installed on your machine and that the PATH for the same is set.
Implement your custom code. Build the project to create a jar file containing your code, using the following command:
mvn clean install –DskipTests.
If the maven build is successful, you need to upload the jar on the pipeline canvas.
Custom processor for Spark supports writing code in Java programming language.
While using Custom processor in pipeline, you need to create a class which implements com.Gathr.framework.api.spark.processor.CustomProcessor interface. Add unimplemented methods in this class to write your own business logic in it.
Shown below is a sample class structure:
There are three methods to implement.
1 . Init: Enables to enter any initialization calls.
2 . Process: Contains actual business logic.This method is called for each and every tuple.
3 . Cleanup: All resource cleanup occurs in this method.
Configuring Custom processor for Spark pipelines
To add a Custom processor into your Spark pipeline, drag the Custom processor on the canvas. Right click on it to configure as explained below.
Click on the NEXT button. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
The container processor loads data to Couchbase server and reads data from it.
This processor is used as a caching container which reads both the aggregated data and raw data from the bucket.
Configuring Container processor for Spark pipelines
To add a Container processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Click on the NEXT button. An option to Save as Template will be available. Add notes in the space provided and click on Save as Template.
Choose the scope of the Template and Click SAVE for saving the Template and configuration details.
Fields are decoded to hide sensitive data.
Base64 is a group of binary-to-text encoding schemes that represent binary data in an ASCII string format by translating it into a radix-64 representation.
Base64 encoding schemes are commonly used when there is a need to encode binary data that needs be stored and transferred over media that are designed to deal with textual data. This is to ensure that the data remains intact without modification during transport.
Configuring Decoder for Spark pipelines
To add a Decoder processor into your pipeline, drag the processor to the canvas and right click on it to configure.
The list of columns in which decoded value of selected column will be stored. New Column name can be added to this field (This column will be added to dataset.)
Click Next to get the schema detected. Click next to add Notes and then save the configuration.
Decryption processor is used to decrypt data coming from data source.
Configuring Decryption processor for Spark pipelines
To add a Decryption processor into your pipeline, drag the processor on to the canvas and right click on it to configure.
Format of the secret key, it could be hexadecimal or Base64 encoded. It is important to encode the binary data to ensure that it is intact without modification when it is stored or transferred.
After the decryption, the Detect Schema window will show the decrypted data.
Distinct is a core operation of Apache Spark over streaming data. The Distinct processor is used for eliminating duplicate records of any dataset.
Configuring Distinct processor for Spark pipelines
To add a Distinct processor into your pipeline, drag the processor on to the canvas and right click on it to configure.
Enter the fields on which distinct operation is to be performed.
Click on the NEXT button. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
Note: Distinct can't be used right after Aggregation and Pivot processor.
Example to demonstrate how distinct works.
If you apply Distinct on any two fields: Name and Age, then the output for the given fields will be as shown below:
In applications, you are often encountered with large datasets where duplicate records are available. To make the data consistent and accurate, you need to get rid of duplicate records keeping only one of them in the records.
Dedup processor returns a new dataset after removing all duplicate records.
Configuring Dedup processor for Spark pipelines
To add Dedup processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Additional properties can be added using ADD CONFIGURATION link.
Click on the NEXT button. Enter the notes in the space provided.
Click Save for saving the configuration details.
Example to demonstrate how Dedup works:
You have a dataset with following rows:
[Row(name='Alice', age=5, height=80),
Row(name='Alice', age=5, height=80),
Row(name='Alice', age=10, height=80)]
Now if Dedup columns are [age, height], then Dedup processor would return below dataset:
[Row(name='Alice', age=5, height=80),
Row(name='Alice', age=10, height=80)]
and if Dedup columns are [name, height], then Dedup processor would return below dataset:
[Row(name='Alice', age=5, height=80)]
The Data Cleansing Processor is used to cleanse the dataset using the metadata. To add a Data Cleansing Processor into your pipeline, drag the processor to the canvas and right click on it to configure:
DQM stands for Data Quality Management. DQM processor defines the quality rules for data and performs necessary action on the faulty data.
Note: This processor does not work on Mozilla Firefox.
It performs series of validation on individual data field along with actions on functions, in case if applied validation fails. Varieties of validations are available for each Data Type such as Not Null, Contains, and Starts With and Ends With.
DQM processor has seven categories of Validation functions, one for each Data Type.
Note: Array represents the JSON (not nested) array of primitive types (String, Number, Double and Boolean)
8. Opening Braces and Closing Braces
Use these braces for adding or separating two or more expressions.
DQM processor has three Action functions that are triggered when Validation fails:
1. Send To Error –It persists the invalid records in configured error handler target and discard the records for further processing in pipeline. Right click the Send To Error button to open the error handler configuration. When the data validation fails, a json message is sent to the configured error handler target. This json has the key “errorMessage” in the error handler target, that contains the expressions due to which validation fails.
Note: The last Error Log Target configuration saved on Send to Error is where the configuration is saved.
2. Assign Value –It gives option to assign static or dynamic values to any message field.
Select any message field and apply validation on it. If the validation criteria is not met, assign static values to that field.
Let us apply validation on field “id” (id equals to 10).
Click on the Save button for saving the validation criteria.
Whenever the value of the field “id” is equal to “10”, the record will be processed as it is.
If the value of the field “id” is not equal to “10”, static value 100 will be assigned to that id.
Click on the Save button for saving the configuration settings.
Click Next and write the notes. The changes will be reflected in Auto Inspect window
We can see in the above screen wherever id value was not 10, the value got replaced with the static value 100.
Dynamic values can be assigned using @ (the rate operator).
Select any message field and apply validation on it. If the validation criteria is not met, assign dynamic values to that field.
Let us apply validation on field “id” (id equals to 10).
Wherever the value of the field id is equal to 10, the record will be processed as it is. If this validation criteria is not met, dynamic value should be assigned to that id.
You can dynamically assign the values of any other field like crop, soil, climate, pest, cost_per_hectare (message fields) to the field id wherever validation criteria is not met.
Write @ (the rate operator) in the values text box of Configuration Settings-Assign Value.
All the message fields will be displayed in the drop down list. Select the field whose value is to be dynamically assigned to field id.
For example, let us select field “soil” from the drop down list.
Write the expression as: @{soil}
The selected field value has to be enclosed within the curly braces.
After writing the expression, click on the Save button.
Wherever the value of the field id is 10, the record will be processed as it is.
Otherwise, the value of the field soil will be assigned to the id value in that column.
Click on the Save button for saving the configuration settings.
Click Next and write the notes. The changes will be reflected in Auto Inspect window
We can see in the below screen, wherever field id value was not 10, field soil got assigned to the id value.
Note: It is important to keep data type into consideration while assigning the dynamic values. The data type of two columns should be same for assigning the values except the String datatype.
Dynamic Numeric value: For numeric fields, mathematical operations can be used for assigning the values.
Let us apply validation on field “id” (id equals to 10).
Wherever the value of the field id is equal to 10, the record will be processed as it is. If this validation criteria is not met, assign dynamic value to the field id using arithmetic operators.
Let us apply simple mathematical operation on the field “cost_per_hect”.
Write the expression as @{cost_per_hect+100}
The expression will be evaluated as cost per hectare value will be added to 100. The output will be assigned to the field id wherever its value is not 10.
Click on the Save button for saving the configuration settings.
Click Next and write the notes. The changes will be reflected in Auto Inspect window.
Let us evaluate the columns “id and cost_per_hect”.
The value of the cost_per_hect in the second row is 7193, if 100 is added to it, the output is 7293 and this value gets assigned to the field id.
The value of the cost_per_hect in the third row is 7403, if 100 is added to tithe output is 7503, this value gets assigned to the field id.
Accordingly, all id values get updated except where the id is 10.
3. Discard –It discards the record for further processing in pipeline.
If a record is eligible for more than one action then Actions will be executed as per below order:
1. Send To Error (Highest priority)
To add a DQM processor into your pipeline, drag the DQM processor to the canvas and connect it to a Data Source or processor. Right click on it to configure.
On the configuration canvas of Data Quality processor, select fields, then drag and drop the Validation and Action functions.
Choose a field in message panel from Configuration, respective data type validations will be expanded in function panel.
After selecting the message, drag and drop the validation functions and connect them.
Connecting lines show the Logical operator, AND/OR over it.
By default, the AND operator is selected, but it can be toggled to OR operator via right click on it.
Most validation functions in Data Quality processor does not require user input such as Not Null, Is Empty, Upper Case etc, hence no error icon gets visible on top of them. There are some validation functions which requires user input and shows the error (red) icon for configuration.
Right click on the field (message) to configure the required input for validation function. Configuration panel also shows the Negate option so that same validation function can be utilized with reverse condition.
Note: The schema fields name should not match with any of the chosen function’s field name.
Example: For a schema field with name start date if a function is selected having a similar field name, then the value in the respective function’s field will get filled according to the schema field.
In such a scenario for an expression to be successful, the schema field name must be changed to not match the function field name.
Once all of the validations are configured, you can add an Action function at the end. This action function gets triggered if validation fails.
You can select multiple fields and define the validations.
Note: If you are using any Timestamp field for configuration, specify the value in following format:
For Date field, use below format:
Using Drools Processor we can perform rules based processing on incoming data using Drools Rule Engine.
To use this processor user needs to create a jar either manually or using Drools Workbench (Kie Jar) and upload it on the processor.
Configuring Drool processor for Spark pipelines
To add a Drools processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Drop Processor removes the selected fields from the Data Sets, which means it will not be reflected in the Schema.
Configuring Drop processor for Spark pipelines
To add a Drop processor into your pipeline, drag the processor to the canvas and right click on it to configure
Click NEXT and add notes if required.
Click SAVE for saving the configuration details.
Fields are encoded to hide sensitive data.
Base64 is a group of binary-to-text encoding schemes that represent binary data in an ASCII string format by translating it into a radix-64 representation.
Base64 encoding schemes are commonly used when there is a need to encode binary data that needs be stored and transferred over media that are designed to deal with textual data. This is to ensure that the data remains intact without modification during transport.
Configuring Encoder for Spark pipelines
To add an Encoder processor into your pipeline, drag the processor to the canvas and right click on it to configure.
The list of columns in which encoded value of selected column will be stored. New Column name can be added to this field (This column will be added to dataset.)
Click Next to get the schema detected.
Click Next to add Notes and save the configuration.
Encryption processor allows data to be encrypted. The data coming out from a data source, which requires further processing and data security, this processor allows to hide the actual data during transformation or processing.
This is useful to hide or transform sensitive or confidential data such as credit card details, bank details, contact details, personal details etc. Encryption processor lets you to encrypt that sensitive information.
Configuring Encryption for Spark pipelines
To add an Encryption processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Format of the secret key, which is hexadecimal or Base64 encoded. It is important to encode the binary data to ensure it to be intact without modification when it is stored or transferred.
The eviction processor can delete data from the containers which are configured in the JDBC Container processors. The pipeline with JDBC container processor will have retention enabled.
You can use an Eviction processor to view and delete the retention data from JDBC oracle containers.
There are a few prerequisites of an Eviction Processor
• One of the pipeline contains JDBC container processor with retention enabled. To do so, create a pipeline with JDBC container processor and set retention policy.
• A successful Oracle database connection – Eviction processor works only for Oracle database.
Configuring Eviction processor for Spark pipelines
To add an Eviction processor into your pipeline, drag the processor to the canvas and right click on it to configure.
The details which are configured here will be used by the eviction processor to delete the data which includes:
Click on the Save button If you do not wish to create a new template.
Use data eviction processor to delete the data according to the retention policy from the respective Oracle JDBC container.
For data eviction, select data eviction processor and attach to configured dummy channel.
Select the oracle connection containing the required containers.
The list of Oracle JDBC containers (tables) will be shown on the UI.
Select or deselect the JDBC container as per the requirement.
The data of the selected containers will be deleted according to the retention information (policy) configured for the respective container.
This processor is responsible for performing transformation operations on incoming dataset, e.g., replace, format, trim, uppercase, lowercase, etc. It uses spark expression language for preparing transformation query.
Configuring Expression Evaluator for Spark pipelines
To add an Expression Evaluator processor into your pipeline, drag the processor to the canvas and right click on it to configure.
All the columns of the schema are populated and you can apply any transformation on it. The functions that can be applied are all listed in the right table, as shown in the figure.
Note: Refer Expression Evaluator section in Data Preparation section.
Computes the cosine inverse of the given value; the returned angle is in the range 0.0 through pi. Returns the computed cosine inverse in the range 0.0 through pi.
We have taken column1.colors as ["black","red"] array_contains(@{column.schema.column1.colrs},"red") will return true
Returns the maximum value in the array. NULL elements are skipped Returns the maximum value in the array. NULL elements are skipped
Returns the minimum value in the array. NULL elements are skipped Returns the minimum value in the array. NULL elements are skipped
Returns the (1-based) index of the first element of the array as long Returns the (1-based) index of the first element of the array as long
Computes the numeric value of the first character of the string column, and returns the result as an int column. arg0: The string for which the first character's numeric value to be calculated.
Computes the sine inverse of the given value; the returned angle is in the range -pi/2 through pi/2 Returns the computed sine inverse in the range -pi/2 through pi/2.
Computes the angle theta from the conversion of rectangular coordinates (arg0, arg1) to polar coordinates (arg1, theta).
Returns the string representation of the long value expr represented in binary arg0: The numerical Column to be converted to represented in binary.
Computes the value of the column arg0 rounded to 0 decimal places with HALF_EVEN round mode. arg0: The column for which value rounded to 0 decimal places with HALF_EVEN round mode to be calculated.
Converts the ASCII value to equivalent character. If n is larger than 256 the result is equivalent to chr(n % 256). Returns the ASCII character having the binary equivalent to expr.
Converts the ASCII character having the binary equivalent to expr. If n is larger than 256 the result is equivalent to chr(n % 256) Returns the ASCII character having the binary equivalent to expr.
Returns the first non-null argument if exists. Otherwise, null. Returns the first non-null argument if exists. Otherwise, null.
arg0:A number/String number column arg1:Integer value of base from which a number is to be converted arg2:Integer value of base to which a number is to be converted
Extracts the day of the month as an integer from a given date/timestamp/string. arg0: The date/timestamp/string from which the day of month to be extracted.
Extracts the day of the year as an integer from a given date/timestamp/string. arg0: The date/timestamp/string from which the day of year to be extracted.
Converts an angle measured in radians to an approximately equivalent angle measured in degrees. arg0: The column for which the equivalent angle measured in degrees to be calculated.
Computes the rank of a value in a group of values. Unlike the function rank, dense_rank will not produce gaps in the ranking sequence. select dense_rank() OVER (order by col) will return 1,2,3,4...
Separates the elements of array expr into multiple rows, or the elements of map expr into multiple rows and columns.
Separates the elements of array expr into multiple rows, or the elements of map expr into multiple rows and columns.
Parses the expression string into the column that it represents. expr("colA", "colB as newName") will return two columns colA and newName
Extracts the hours as an integer from a given date/timestamp/string. arg0: The date/timestamp/string from which the hours to be extracted.
Computes sqrt(arg0^2^ + arg1^2^) without intermediate overflow or underflow.
Given a date column, returns the last day of the month which the given date belongs to. arg0: The date from which last day of month to be extracted.
Returns the last value of expr for a group of rows. If isIgnoreNull is true, returns only non-null values. SELECT last_value(col) FROM VALUES (10), (5), (20) AS tab(col); will return 20
Computes the Levenshtein distance of the two given string columns. arg0: The first string column from which the Levenshtein distance from the second string column to be determined.
str like pattern - Returns true if str matches pattern, null if any arguments are null, false otherwise. SELECT '%SystemDrive%UsersJohn' like '%SystemDrive%Users%'; will return true
Left-pad the string column with the given string, to a given length. arg0: The string column to be left-padded. lpad("SQL Tutorial", 20, "ABC") will return "ABCABCABSQL Tutorial"
Removes the leading string contains the characters from the trim string. arg0:the trim string characters to trim, the default value is a single space.
Creates a map with a pair of the given key/value arrays. All elements in keys should not be null. map_from_arrays(array(1.0, 3.0), array('2', '4')) will return {1.0:"2",3.0:"4"}
Extracts the minutes as an integer from a given date/timestamp/string. arg0: The date/timestamp/string from which the minutes to be extracted.
Extracts the month as an integer from a given date/timestamp/string. arg0: The date/timestamp/string from which the month to be extracted.
Computes the percentage ranking of a value in a group of values. select percent_rank() OVER (order by col) will return 1,2,3,4....
Separates the elements of array expr into multiple rows with positions, or the elements of map expr into multiple rows and columns with positions.
Separates the elements of array expr into multiple rows with positions, or the elements of map expr into multiple rows and columns with positions.
Returns the position of the first occurrence of substr in str after position pos. The given pos and return value are 1-based.
Converts an angle measured in degrees to an approximately equivalent angle measured in radians. arg0: The column for which equivalent angle measured in radians to be calculated.
arg0:A column needs to be repeated arg1:Integer value representing no of times arg0 is to be repeated
Computes the double value that is closest in value to the argument and is equal to a mathematical integer.
Computes the value of the column arg0 rounded to 0 decimal places. arg0: The column for which value rounded to 0 decimal places to be calculated.
Assigns a unique, sequential number to each row, starting with one, according to the ordering of rows within the window partition. select row_number() OVER (order by col) will return 1,2,3,4....
Right-pad the string column with the given string, to a given length. arg0: The string column to be right-padded. rpad("SQL Tutorial", 20, "ABC") will return "SQL TutorialABCABCAB"
Trim the spaces from right end for the specified string value. arg0: The string column from which right spaces to be trimmed.
Extracts the seconds as an integer from a given date/timestamp/string. arg0: The date/timestamp/string from which the seconds to be extracted.
Calculates the SHA-1 digest for string and returns the value as a hex string We have taken column1 as 'ABC' sha1(@{column.schema.column1}) will return '3c01bdbb26f358bab27f267924aa2c9a03fcfdb8'
If the given value is a long value, this function will return a long value else it will return an integer value
If the given value is a long value, this function will return a long value else it will return an integer value
Subsets array x starting from index start (or starting from the end if start is negative) with the specified length.
Computes the substring from given string before given count occurrences of the given delimiter. arg0: The String column from which substring to be extracted.
Returns the angle measured in radians to an approximately equivalent angle measured in degrees.
Parses the date_str expression with the fmt expression to a date. Returns null with invalid input. By default, it follows casting rules to a date if the fmt is omitted.
to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')) will return {"time":"26/08/2015"}
arg1:The function to apply on each element of the array expression. SELECT transform(array(1, 2, 3), x -> x + 1); will return [2,3,4]
Translate any character in the given string by a given character in given replaceString. arg0: The string column in which the translation to be done.
Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string
Returns true if the XPath expression evaluates to true, or if a matching node is found. SELECT xpath_boolean('<a><b>1</b></a>','a/b'); will return true
Returns a double value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric. SELECT xpath_double('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3.0
Returns a float value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric. SELECT xpath_float('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3.0
Returns a integer value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric. SELECT xpath_int('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3
Returns a long value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric. SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3
Returns a double value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric. SELECT xpath_number('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3.0
Extracts and returns the text contents of the first xml node that matches the XPath expression. SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c'); will return cc
Extracts the year as an integer from a given date/timestamp/string. arg0: The date/timestamp/string from which the year to be extracted.
This processor is responsible for performing filtering operations on incoming dataset, e.g., equals, contains, not-null, ranges, matches, starts-with, ends-with, etc. It uses spark expression language for constructing filter criteria.
Configuring Expression Filter for Spark pipelines
To add an Expression Filter processor into your pipeline, drag the processor to the canvas and right click on it to configure.
The user can apply any transformation and the list of functions that can be applied are all listed in the right table on the user interface.
Data filtering is required to refine the datasets so that you simply get what you need excluding other data that can be repetitive, irrelevant or sensitive. Different type of filters can be used to query results or other kind of information.
The Filter processor is used to display specific records by using filter criteria. For example, you can define the Filter rule: grade contains “software” and name contains “Mike”. The system will fetch all the records which meet this criteria.
Configuring Filter processor for Spark pipelines
To add a Filter processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Click on the Save button If you do not wish to create a new template.
The Field Splitter splits string data based on a regular expression and passes the separated data to new fields. Use the Field Splitter to split complex string values into logical components.
Field Splitter splits string data based on a regular expression and passes the separated data to new fields. Use the Field Splitter to split complex string values into logical components.
For example, if a field contains an error code and error message separated by a comma, you can use the comma to separate the code and message into different fields.
Configuring Field Splitter processor for Spark pipelines
To add Field Splitter processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:
To edit the Data Type of the schema, you can use this processor. The Field Converter processor generates the schema and the fields are editable after the schema is detected.
Configuring Field Converter processor for Spark pipelines
To add Field Converter processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below.
On the first page you will be asked to Add Configuration, which is optional.
Once you click Next, the schema is detected and you can edit the Data Type as required.
A field replacer allows you to replace a field value by providing a hard coded value e.g. null with "some value", replace a value which is following a pattern by providing regex, and replace a field value by other field value or combination of field values.
You can replace input field if the regex pattern matches.
There are two option to provide values.
1. Value will be a combination (concatenation) of two columns.
For e.g: in below image it is “JOBROLE” CONCAT “NAME”.
Order in which you select the columns will be considered for the combination.
2. Enter a single value to replace with.
Configuring Field Replacer processor for Spark pipelines
To add a Field Replacer processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:
After configuring the component, detect the schema for desired result.
The below image confirms the input field (NAME) where regex pattern (sagar|Nikhil) matches replaced with uservalue and created new column (NEWNAME).
Field Flattener allows you to flatten a nested record to produce a record with no nested fields, it can also flatten specific list or map fields.
By default, the new flattened fields is separated by”_”
Let’s take an example of a nested JSON.
Specify a path of input json in the record, so that it flattens all nested structures until the nested path is flat.
Flatten Example 1: The JSON below has nested fields under Employee, ID, Name, Permanent and Address. Now using the Field flattener, you will be able to flatten the nested fields i.e., Street, City, Zipcode as shown below:
Configuring Field Flattener processor for Spark pipelines
To add Field Flattener processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:
Flatten list and map fields that contains additional nested list or map fields. Specify the path to the field, for example: Employee.
Function processor enables to perform spark SQL functions on dataset.
Configuring Functions processor for Spark pipelines
To add Functions processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:
Click on the NEXT button. You will view the fields generated from data source. You can provide new output format of the date field. For example, if the input date format was yyyy-dd-yy, specify new date format as yyyy/MM/dd
Click Next after changing the Date format. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
HTTP Processor is used to enrich the incoming data with the response of external rest calls. The user can consume different URLs in parallel and use their response to enrich the resulting data either in a new column or existing column.
The user can create and run multiple URLs by clicking on the ADD FIELD option.
Hash fields based on different Hashing Algorithms, to hide sensitive information.
Hashing is a technique in which an algorithm (also called a hash function) is applied to a portion of data to create a unique digital “fingerprint” that is a fixed-size variable. If anyone changes the data by so much as one binary digit, the hash function will produce a different output (called the hash value) and the recipient will know that the data has been changed. Hashing can ensure integrity and provide authentication as well.
The hash function cannot be “reverse-engineered”; that is, you can't use the hash value to discover the original data that was hashed. Thus, hashing algorithms are referred to as one-way hashes. A good hash function will not return the same result from two different inputs (called a collision); each result should be unique.
Configuring Hashing processor for Spark pipelines
To add Hashing processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:
After configuring all the fields, click Next; the schema will be detected and then you can verify and save the configuration.
JDBC Container allows you to perform- Read, Aggregation, Merge and Write operations, by using this processor over an oracle connection.
A JDBC Container allows the implementation of a retention policy as well. Retention policy lets you cleanse/delete unwanted data suing a retention period.
You can also apply checksum in case of aggregation to avoid duplicity of records.
Configuring JDBC-Container processor for Spark pipelines
To add a JDBC Container Processor into your pipeline, drag the processor on the canvas and right click on it to configure.
After the configuration is the Mapping page, where you can map the incoming schema in the fetched columns. Shown above in the table are the properties and description.
Once the configuration is finalized, click Done to save the file.
A join processor can create a relation between two incoming messages (DataSets).
Configuring Join processor for Spark pipelines
To add a Join Processor into your pipeline, drag the processor on the canvas and right click on it to configure.
First tab is Join Condition. It allows to define the Join Type between two DataSets where two columns from both DataSets can have a Join condition applied to them.
Second Tab is Broadcast Table, it guides Spark to broadcast the specified table, when joining them with another table.
Second Tab is Filter Condition, It allows to apply a filter condition to the DataSets. You can apply AND and OR operations wit two filter conditions.
The Filter Condition that can be applied are as follows:
Third tab is Join Projection where you can apply a query and expressions to the columns.
Jolt is a JSON transformation library written in Java where the specification for the transformation is a JSON document.
Jolt processor is used for performing a JSON to JSON transformations in Java. It applies transformation on Input JSON document and generates a new JSON document as per the Jolt specifications
Jolt processor transforms the structure of JSON data and does not manipulate the values.
Each transform has it's own Domain Specific Language which facilitates it’s job.
Refer following links for detailed information:
https://github.com/bazaarvoice/jolt
https://jolt-demo.appspot.com/#inception
Configuring Jolt processor for Spark pipelines
To add a JOLT Processor into your pipeline, drag the processor on the canvas and right click on it to configure.
Select the Jolt Specification JSON file which is to be uploaded. Click on UPLOAD FILE button. Browse to the location where JSON file is saved and double-click on it for uploading the JSON file.
The content of the specification JSON file gets displayed below. This specification gets applied on incoming json document and generates the transformed output.
Additional properties can be added using ADD CONFIGURATION link.
Click on the NEXT button. Enter the notes in the space provided. Click Save for saving the configuration details.
The JSON. parse() method parses a JSON string, constructing the JavaScript value or object described by the string. Below are the configuration details of JSON Parser:
The Limit processor is used to fetch specified number of rows from the dataset.
For example, if 1000 records are getting generated and you want to fetch only 50 rows, you can make use of this processor.
The record of these 5o rows will be carried to the next component.
This processor can only be used with Batch components.
Note: If you try to add Limit processor after Streaming Data Source, it will not get added and you will get a warning message.
Configuring Limit Processor for Spark Pipelines
To add Limit processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Specify number of rows which are to be fetched in “No of Elements“fields.
Additional configuration details can be added using ADD CONFIGURATION link.
Click on the Next button. Enter the notes in the space provided. Click Save for saving the configuration details.
Field masking is done to hide sensitive data.
Configuring Masking processor for Spark pipelines
To add a Masking processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below
Configurable fields are as follows:
The screenshot shown below has two fields, one with Masking Type and other has Input Field to be masked.
Click next to get the schema detected.
As shown above, ALL the characters of field fieldname are masked and Head Charaters of newcol field.
Click Done to save the configuration.
The NA processor is used to handle the null values of any dataset.
If the value specified is Number, all null values of selected input field will be replaced by the specified number.
If the value specified is String, all null values of selected input field will be replaced by the given string.
For example, if there is any dataset in which input fields “Name” and “Age” are null, you can specify value for Name as “Mike” and “Age” as “30” that should substitute the null values.
Note: It does not support Date and Timestamp fields.
Configuring NA processor for Spark pipelines
To add NA Processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Select the field from drop-down list that needs to be replaced. It will replace the null values of input field with this value.
Click on the NEXT button. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
Pivot is an aggregation processor where one of the grouping columns have distinct values which later gets transposed to form individual columns. These column values further help in data analysis and reporting.
Configuring Pivot processor for Spark pipelines:
To add Pivot processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Click on the NEXT button. You can view the new schema based on your pivot configuration on Schema tab.
Click SAVE for getting the final output of pivot processor.
Let us say, you have a dataset comprising of five records having four fields: “OrderId”, “Product”, “Category” and “Amount”.
You wish to group on the basis of field “Product”, pivot on column “Category” and type of aggregation to be applied is “SUM” on column “Amount”.
The Pivot processor applies the aggregation operation(SUM) on the Pivot column and creates pivot table.
Since, you have applied the operation “SUM” on column “Amount”, it will calculate the sum of different vegetables and fruits.
Sum of the fruits banana is 300, vegetable broccoli is 400 whereas carrot is 100.
If no amount is found for a particular vegetable, it will show null values against the product.
In the components panel, under the Processors tab, select the Processor Group component and configure it.
Note: To understand more about the Processor Group, see Processor Group.
You will have an accordion view of the mapped input channel(s) with the incoming dataset(s).
The processor group input source name is mapped against the input dataset.
All the fields within the input channel listed in the table are mapped against the columns of the corresponding dataset. Here, you can select column(s) of the dataset or write an expression against the field(s) of the input channel.
Note: In the processor group configuration panel the input channel(s) that were selected in the processor group page are mapped/configured against the input dataset(s) chosen in the data pipeline page.
Click Next to detect schema and click Done.
The user can connect any processor/emitter.
Common scenarios where Processor Groups could be used
1. If the user has a single input data source – processor group(s) – Emitter(s).
2. If the user has multiple input data sources – processor group(s) – Emitter(s).
The Python processor allows you to perform following operations:
• Write custom Python Spark code for defining transformations on Spark DataFrames.
• Write custom Python code for processing input records at runtime.
Gathr provides support for two python versions, Python 2 and Python 3. Multiple version support enables a python processor to run on different python versions.
Configuring Python processor for Spark pipelines
To add a Python processor into your pipeline, drag the processor on the canvas and right click on it to configure. To select a version from Python 2 and Python 3, select either from Python Version property.
Note: Code snippets contains sample codes for your reference.
To pass configuration parameters in Python processor.
You can provide configuration parameters in Python processor in form of key value pair. These parameters will be available in form of dictionary in function given in Function Name field as second argument. So function given in field Function Name will take two arguments: (df, config_map)
Where first argument will be dataframe and second argument will be a dictionary that contains configuration parameters as key value pair.
In below example HDFS path of trained model is provided in configuration parameter with key as ‘model_path’ and same is used in predictionFunction.
If there is any error/exception in python code, an error/exception is generated.
A simple program is written in text area in Python processor. Here the variable model is used but not defined in program, hence an exception with message ‘global name model is not defined’ is displayed on the screen.
Click on the NEXT button after specifying the values for all the fields.
Enter the notes in the space provided.
Click on the SAVE button after entering all the details.
Use rank functions (rank, dense_rank, percent_rank, ntile, row_number) while configuring the Rank processor.
Rank Processor computes the rank of a value in a group of values. The result is one plus the number of rows preceding or equal to the current row in the ordering of the partition.
dense_rank() - Computes the rank of a value in a group of values. The result is one plus the previously assigned rank value. Unlike the function rank, dense_rank will not produce gaps in the rankings.
percent_rank() - Computes the percentage ranking of a value in a group of values.
ntile(n) - Divides the rows for each window partition into n buckets ranging from 1 to at most n .
row_number() - Assigns a unique, sequential number to each row, starting with one, according to the ordering of rows within the window partition.
Configuring Rank processor for Spark pipelines
To add a Rank processor into your pipeline, drag the processor on the pipeline canvas and right click on it configure.
Click on NEXT button. Enter the notes in the space provided. Click on the SAVE button for saving the configuration details.
The Repartition processor changes how pipeline data is partitioned by dividing large datasets into multiple parts.
The Repartition processor is used when you want to increase or decrease the parallelism in an executor. The number of parallelism maps to number of tasks running in an executor. It creates either more or fewer partitions to balance data across them and shuffles data over the network.
For example, you can use single partition if you wish to write data in a single file. Multiple partitions can be used for writing data in multiple files.
Configuring Repartition processor for Spark pipelines
To add a Repartition processor into your pipeline, drag the processor on the pipeline canvas and right click on it to configure
Enter the number of partitions in the Parallelism field.
Click on the NEXT button. Enter the notes in the space provided.
Click on the Done button after entering all the details.
This processor is used for fetching historical data from any streaming or batch source and that data gets registered as a table. This table can be referred further if you wish to perform some queries on registered data sources.
You can fetch tables from Cassandra, Couchbase, Redshift, HDFS, Hive, JDBC, Snowflake, File, Incoming Message and S3.
For example, you have historical data on HDFS which contains information about different departments of an organization. You can register that data using Register as Table processor and that registered table can be used in SQL processor to fetch number of employees in an organization.
Configuring Register as Table processor for Spark pipelines
To add a Register as Table processor into your pipeline, drag the processor on the canvas and right click on it to configure.
If data source selected is HDFS, there will be following fields:
If you select the Data Source as HIVE or JDBC, there will be two additional fields:
If the user selects Snowflake, the below mentioned field will be additional
. The user will be required to provide the connection name for creating connection. Provide the schema name against this column. Note: The user can provide the database table name or provide a query.
If Data Source selected is S3, there will be one additional field:
If the option selected is incoming message, output of sources connected before this processor will act as an incoming message for Register As Table processor.
You need to specify the name with which table is to be registered after fetching the data.
If you select the Data Source as Cassandra, there will be two additional fields:
If you select the Data Source as Couchbase, there will be an additional field:
If you select the Data Source as Redshift, there will be a few additional fields, depending on the two following options:
Rename processor enables renaming the Fields. The renamed columns will be reflected in the Schema.
Configuring Rename processor for Spark pipelines
To add a Rename processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below.
Click on NEXT button. Enter the notes in the space provided. Click on the SAVE button for saving the configuration details.
Router enables routing multiple data streams from a Router Processor to any emitter/processor according to the route selected.
Configuring Router processor for Spark pipelines
To add a Router processor into your pipeline, drag the processor to the canvas and right click on it to configure.
The properties shown in above image are explained below:
Amazon SageMaker allows to build, train, and deploy machine learning models quickly. Amazon SageMaker is a fully-managed service that covers the entire machine learning workflow to label and prepare your data. Choose an algorithm, train the algorithm, tune and optimize it for deployment, make predictions, and take action. Your models get to production faster with much less effort and lower cost. Once you have trained the model over Sagemaker, you can deploy them as endpoint which can be further utilized in StreamAnalytix for prediction over incoming data.
Configuring Sagemaker processor for Spark pipelines
To add a SageMaker processor into the pipeline, drag the processor to the canvas and right click on it to configure as explained below:
Out of all built in supported algorithms from Sagemaker, user can utilize below mentioned algorithms endpoints for scoring over incoming data using Gathr.
1. Linear Learner Binary Classification
StreamAnalytix SageMaker supports Custom model as well. For example, one can train model-using scikit-learn api and then deploy that model as endpoint over SageMaker for prediction. Once endpoint is in In-service state it can be used for prediction in Gathr.
The below table has Algorithms with their corresponding output field tabs:
The Select processor is used to retrieve specific columns from the dataset.
On configuration settings page, specify the schema fields that you wish to retrieve in the output.
For example, you entered the fields: OrderId, ShipVia and Freight.
Only these three fields will flow to the next component which is connected in the pipeline.
Click on NEXT button. Enter the notes in the space provided. Click on the SAVE button for saving the configuration details.
SQL Processor allows you to run SQL queries over streaming data and registered tables. It provides a common way for accessing different data sources.
For example, if you want to analyze weather data stream continuously and find the average temperature which is recorded every minute, you can use the SQL query. The output would be stream of new records showing average temperature recordings.
Configuring SQL processor for Spark pipelines
To add an SQL processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:
Click on the NEXT button. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
Scala is a new language and is based on Java Virtual machine compatibility. You can code Scala in the similar manner as you code Java and has minimal prerequisites. Writing code in Scala is much faster and easy to use.
The Scala processor is used for writing custom code in Scala language.
Configuring Scala processor for Spark pipelines
To add a Scala processor into your pipeline, drag the processor on the canvas and right click on it to configure.
Next, click on the Jar Upload tab. Jar file is generated when you build Scala code. You can upload the third party jars from here so that the API's can be utilized in Scala code by adding the import statement.
Click on the NEXT button. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
By using Schema Flattener user can flatten the incoming JSON as per the meta data which is provided by using a S3/RDS/Upload feature.
Schema Transformer transforms the desired schema in a format which is expected at the next processor. In other words, you can use this processor in those pipelines where you want to transform the incoming schema.
You can use schema transformation processor for performing cast, rename and projection operation on incoming schema collectively.
As shown in the figure below, you can edit the Column Alias and set a new Data Type. The transformed schema will further flow into the pipeline and the new schema will be available on the next processor.
The checkbox allows you to select the columns that can be projected on to the next operator.
Configuring Schema Transformer processor for Spark pipelines
To add a Schema Transformer processor into your pipeline, drag the processor on the canvas and right click on it to configure.
Click on the NEXT button. Enter the notes in the space provided.
Click DONE for saving the configuration details.
Sequence Generator processor allows you to generate a unique id for all the records generated by the processor. In other words, for generation of sequence in pipeline.
This processor will generate sequence in a field which can be used further in a pipeline.
Configuring Sequence processor for Spark pipelines
To add a Sequence processor into your pipeline, drag the processor to the canvas and right click on it to configure. There are three options to generate the type of sequence for the records.
Sequence: generates numeric sequence of values. Composite Key: The combination of fields selected in order will serve as primary key
Now, in case of Sequence, the following properties will be generated:
In case of Composite Key, the following properties will be generated:
Composite Key: The combination of fields selected in order will serve as primary key. Keys will be created in the order of the selection of the fields.
Snowflake is a cloud-based data warehouse system. The user can use Snowflake as a processor in Gathr for ELT pipelines.
Sorting is arranging the data in ascending or descending order.
For example, you can use this processor for sorting the height of students in ascending order.
Configuring Sort processor for Spark pipelines
To add a Sort processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Enter the notes in the space provided.
Click SAVE for saving the configuration details.
Note: In Batch pipelines, Sort processor can be used without Aggregation processor.Whereas in streaming pipelines, Sort processor can be used only after Aggregation processor.
The Stored Procedure processor is used to run a set of SQL queries. The user can now invoke stored procedure and functions defined in the database within Gathr. While the input data is passed as an argument to functions and stored procedures, the resultant output can be used to enrich the incoming data.
Instead of sending multiple SQL statements to the database server, an application can send a set of queries in what is called a stored procedure.
A function is a set of SQL statements that perform some operation and return a result.
Top-N queries are queries that limit the result to a specific number of rows. These are often queries for the most recent or the “best” entries of a result set. For efficient execution, the ranking must be done with a pipelined order by. You can get Top-N records based on a sorting criterion.
Top N Records works with Batch Pipeline only.
Configuring TopNRecords processor for Spark pipelines
To add a TopNRecords processor into your pipeline, drag the processor to the canvas and right click on it to configure.
Click Next to detect schema and Save the configuration.
Union processor is not configurable.
The Union operation is used when you want to combine elements of two datasets.
Example to demonstrate how union works.
Watermark processor provides Spark Watermarking feature for late data handling and Stream- Stream Joins. The user can specify event column and delay for the incoming data to enable watermark on the dataset.
Configuring TopNRecords processor for Spark pipelines Select the Event Time Column. Event Time column is the timestamp field from incoming data on the basis of which the can enable watermarking.
The user can add further Configuration by clicking at the ADD CONFIGURATION button. Click Next to proceed further.
To add an XML Parser processor into your pipeline, drag the processor to the canvas and click on it to configure.
By using the XSLT processor, user can transform incoming XML input to another transformed XML by using an XSL file from S3 or HDFS.
To add an XSLT processor into your pipeline, drag the processor to the canvas and click on it to configure.