Python Processor
- Processor Configuration
- Ask AI Assistant
- Notes
- Code Snippets
- Add a new column with constant value to existing dataframe using Pyspark
- Add a new column with random value to existing dataframe using Pyspark
- Add a new column using expression to existing dataframe using Pyspark
- Transform an existing column using Pyspark
- Transform an existing column using udf
- Apply transformation using pandas in batch pipeline
- Apply transformation using pandas in streaming pipeline (using pandas udf)
- Make a POST web service call using Pyspark
- Extract top-N records using Pyspark
- Drop/Delete a column using Pyspark
- Web Scraping using Python
- Text Chunking using Textwrap
- Parse docx, pptx, xlsx and Images
In this article
- Processor Configuration
- Ask AI Assistant
- Notes
- Code Snippets
- Add a new column with constant value to existing dataframe using Pyspark
- Add a new column with random value to existing dataframe using Pyspark
- Add a new column using expression to existing dataframe using Pyspark
- Transform an existing column using Pyspark
- Transform an existing column using udf
- Apply transformation using pandas in batch pipeline
- Apply transformation using pandas in streaming pipeline (using pandas udf)
- Make a POST web service call using Pyspark
- Extract top-N records using Pyspark
- Drop/Delete a column using Pyspark
- Web Scraping using Python
- Text Chunking using Textwrap
- Parse docx, pptx, xlsx and Images
The Python processor allows you to perform following operations:
Write custom Python Spark code for defining transformations on Spark DataFrames.
Write custom Python code for processing input records at runtime.
Please email to Gathr Support to enable the Python Processor.
There are several Code Snippets that are available in the application and the same are explained in this topic to get you started with the Python processor.
Processor Configuration
Utilize Python Virtual Environment
Enable it to use a Python virtual environment for the Python Processor.
Python Packages
Enter Python package names separated by newlines or upload a package list.
Download a sample package list for reference.
Input Type
There are two ways for configuring the Python processor:
Inline: This option enables you to write Python code in the text editor. If selected, you will view one additional field Python Code.
Python Code: Enables to write custom Python code directly on text editor.
Upload: This option enables you to upload single and multiple python scripts (.py files) and python packages (.egg/.zip files). You have to specify module name (should be part of uploaded files or package) and method name that will be called by the python processor.
When you select Upload, UPLOAD FILE option appears on the screen, browse and select the files that need to be used in the python processor.
One additional field, Import Module will also appear on the screen, if the Upload option is selected.
Import Module
Specify module name which contains function that will be called by python processor. Here you will get list of all uploaded files in drop down list.
The drop down list will show only .py
files. You can also write a module name if it does not appear in the drop-down list.
Function Name
Name of the python function that is defined in Inline Python code or uploaded script.
Add Configuration: Enables to add Additional properties.
To pass configuration parameters in Python processor.
You can provide configuration parameters in Python processor in form of key value pair. These parameters will be available in form of dictionary in function given in Function Name field as second argument. So function given in field Function Name will take two arguments: (df, config_map)
Where first argument will be dataframe and second argument will be a dictionary that contains configuration parameters as key value pair.
Ask AI Assistant
Use the AI assistant feature to simplify the creation of Python code.
It allows you to generate complex Python code effortlessly, using natural language inputs as your guide.
Describe your desired expression in plain, conversational language. The AI assistant will understand your instructions and transform them into a functional Python code.
Tailor code to your specific requirements, whether it’s for data transformation, filtering, calculations, or any other processing task.
Note: Press Ctrl + Space to list input columns and Ctrl + Enter to submit your request.
Input Example:
Create a column called inactive_days by calculating difference between last_login_date and current date and give those records whose inactive_days is more than 60 days.
Notes
Optionally, enter notes in the Notes → tab and save the configuration.
Code Snippets
Described below are some sample Python code use cases:
Add a new column with random value to existing dataframe using Pyspark
Add a new column using expression to existing dataframe using Pyspark
Apply transformation using pandas in streaming pipeline (using pandas udf)
Add a new column with constant value to existing dataframe using Pyspark
Description
This script demonstrates how to add a column with a constant value.
Here, adding a column name Constant_Column
with string Constant_Value
to an existing dataframe.
Sample Code
from pyspark.sql.functions import lit
def add_column_with_constant_value(df, config_map):
"""
Add a column with constant value
:param df:
:param config_map:
:return:
"""
df = df.withColumn('Constant_Column', lit('Constant_Value'))
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Geography | Age | Balance | EstSalary |
---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | France | 34 | 0 | 6534.18 |
15604348 | Allard | 710 | Spain | 22 | 0 | 99645.04 |
15693683 | Yuille | 814 | Germany | 29 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Spain | 41 | 102827.44 | 64595.25 |
Output Data Set
CustomerId | Surname | CreditScore | Geography | Age | Balance | EstSalary | Constant_Column |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | France | 34 | 0 | 6534.18 | Constant_Value |
15604348 | Allard | 710 | Spain | 22 | 0 | 99645.04 | Constant_Value |
15693683 | Yuille | 814 | Germany | 29 | 97086.4 | 197276.13 | Constant_Value |
15738721 | Graham | 773 | Spain | 41 | 102827.44 | 64595.25 | Constant_Value |
Add a new column with random value to existing dataframe using Pyspark
Description
This script demonstrates how to add a column with random values.
Here, column Random_Column
is added with random values generated using random function.
Sample Code
from pyspark.sql.functions import rand
def add_column_with_random_values(df, config_map):
"""
Add column with random values to dataframe
:param df:
:param config_map:
:return:
"""
df = df.withColumn('Random_Column', rand())
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Geography | Age | Balance | EstSalary |
---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | France | 34 | 0 | 6534.18 |
15604348 | Allard | 710 | Spain | 22 | 0 | 99645.04 |
15693683 | Yuille | 814 | Germany | 29 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Spain | 41 | 102827.44 | 64595.25 |
Output Data Set
CustomerId | Surname | CreditScore | Geography | Age | Balance | EstSalary | Random_Column |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | France | 34 | 0 | 6534.18 | 0.0241309661 |
15604348 | Allard | 710 | Spain | 22 | 0 | 99645.04 | 0.5138384557 |
15693683 | Yuille | 814 | Germany | 29 | 97086.4 | 197276.13 | 0.2652246569 |
15738721 | Graham | 773 | Spain | 41 | 102827.44 | 64595.25 | 0.8454138247 |
Add a new column using expression to existing dataframe using Pyspark
Description
This script demonstrates how to add a new column with existing columns.
Here, column Transformed_Column
is added by multiplying columns EstSalary
and Tenure
.
Sample Code
def add_column_using_other_columns(df, config_map):
"""
Add new column using existing column in a dataframe
:param df:
:param config_map:
:return:
"""
df = df.withColumn('Transformed_Column', (df.EstSalary * df.Tenure))
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Tenure | Age | Balance | EstSalary |
---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 9 | 34 | 0 | 6534.18 |
15604348 | Allard | 710 | 8 | 22 | 0 | 99645.04 |
15693683 | Yuille | 814 | 8 | 29 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | 9 | 41 | 102827.44 | 64595.25 |
Output Data Set
CustomerId | Surname | CreditScore | Tenure | Age | Balance | EstSalary | Transformed_Column |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 9 | 34 | 0 | 6534.18 | 58807.62 |
15604348 | Allard | 710 | 8 | 22 | 0 | 99645.04 | 797160.32 |
15693683 | Yuille | 814 | 8 | 29 | 97086.4 | 197276.13 | 1578209.04 |
15738721 | Graham | 773 | 9 | 41 | 102827.44 | 64595.25 | 581357.25 |
Transform an existing column using Pyspark
Description
This script demonstrates how to transform a column.
Here, rounding off values of column Balance
and converted to integer.
Sample Code
from pyspark.sql.functions import round
def transform_column(df, config_map):
"""
transform a column
:param df:
:param config_map:
:return:
"""
df = df.withColumn('Balance', round(df.Balance).cast('integer'))
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Tenure | Age | Balance | EstSalary |
---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 9 | 34 | 0 | 6534.18 |
15604348 | Allard | 710 | 8 | 22 | 0 | 99645.04 |
15693683 | Yuille | 814 | 8 | 29 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | 9 | 41 | 102827.44 | 64595.25 |
Output Data Set
CustomerId | Surname | CreditScore | Tenure | Age | Balance | EstSalary |
---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 9 | 34 | 0 | 6534.18 |
15604348 | Allard | 710 | 8 | 22 | 0 | 99645.04 |
15693683 | Yuille | 814 | 8 | 29 | 97086 | 197276.13 |
15738721 | Graham | 773 | 9 | 41 | 102827 | 64595.25 |
Transform an existing column using udf
Description
This script demonstrates how to transform a column using pyspark udf.
Here, column Gender
is encoded with integer value.
Create a python function which encodes the values of column
Gender
to numaric values.Convert this method to pyspark udf.
Apply this udf over column
Gender
and add encoded values to the same column.
Sample Code
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def transform_column(df, config_map):
def encode_gender(input_value):
"""
encode gender to numeric values
:param input_value:
:return:
"""
return_value = None
if input_value is not None:
if input_value.lower() == 'male':
return_value = 1
elif input_value.lower() == 'female':
return_value = 2
return return_value
# create udf
encode_gender_udf = udf(encode_gender, IntegerType())
# apply udf to 'Gender' column
df = df.withColumn('Gender', encode_gender_udf(df.Gender))
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | Male | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | Male | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | Male | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Male | 41 | 9 | 102827.44 | 64595.25 |
15809248 | Cole | 524 | Female | 36 | 10 | 0 | 109614.57 |
Output Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 1 | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | 1 | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | 1 | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | 1 | 41 | 9 | 102827.44 | 64595.25 |
15809248 | Cole | 524 | 2 | 36 | 10 | 0 | 109614.57 |
Apply transformation using pandas in batch pipeline
Description
This script demonstrates how to convert pyspark dataframe to pandas dataframe.
Here, the below approach is followed:
First, convert pyspark dataframe to pandas data frame using pyspark’s
toPandas
function.Then, encode
Gender
column to numeric values (transformation).Now convert pandas dataframe to pyspark dataframe.
Required libraries:
numpy
pandas
Sample Code
def encode_gender(input_value):
"""
encode gender to numeric values
:param input_value:
:return:
"""
return_value = None
if input_value is not None:
if input_value.lower() == 'male':
return_value = 1
elif input_value.lower() == 'female':
return_value = 2
return return_value
def transform_using_pandas(df, config_map):
# get sql context
sql_context = df.sql_ctx
# convert pyspark df to pandas df
pandas_df = df.toPandas()
# encode Gender into numeric values
pandas_df['Gender'] = list(map(encode_gender, pandas_df['Gender']))
# convert pandas dataframe to pyspark data frame
df = sql_context.createDataFrame(pandas_df)
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | Male | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | Male | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | Male | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Male | 41 | 9 | 102827.44 | 64595.25 |
15809248 | Cole | 524 | Female | 36 | 10 | 0 | 109614.57 |
Output Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 1 | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | 1 | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | 1 | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | 1 | 41 | 9 | 102827.44 | 64595.25 |
15809248 | Cole | 524 | 2 | 36 | 10 | 0 | 109614.57 |
Apply transformation using pandas in streaming pipeline (using pandas udf)
Description
This script demonstrates how to apply transformation over pysarpk streaming dataframe using pandas.
Here, the below approach is followed:
First, a python function is created which does pandas transformation.
This function is converted into pandas udf of pyspark.
Apply this udf to input dataframe and get desired output dataframe.
Required libraries:
numpy
pandas
2. Schema of output dataframe from pandas udf should be properly defined.
Sample Code
# import modules
from pyspark.sql.types import DoubleType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_using_pandas(df, config_map):
def change_data_types(input_df):
"""
change data types of columns of pandas dataframe
:param input_df:
:return:
"""
input_df.CustomerId = input_df.CustomerId.apply(int)
input_df.Surname = input_df.Surname.apply(str)
input_df.CreditScore = input_df.CreditScore.apply(int)
input_df.Geography = input_df.Geography.apply(str)
input_df.Gender = input_df.Gender.apply(str)
input_df.Age = input_df.Age.apply(int)
input_df.Tenure = input_df.Tenure.apply(int)
input_df.Balance = input_df.Balance.apply(float)
input_df.NumOfProducts = input_df.NumOfProducts.apply(int)
input_df.HasCrCard = input_df.HasCrCard.apply(int)
input_df.IsActiveMember = input_df.IsActiveMember.apply(int)
input_df.EstimatedSalary = input_df.EstimatedSalary.apply(float)
input_df.Exited = input_df.Exited.apply(int)
return input_df
def encode_gender(input_value):
"""
encode gender to numeric values
:param input_value:
:return:
"""
return_value = None
if input_value is not None:
if input_value.lower() == 'male':
return_value = 1
elif input_value.lower() == 'female':
return_value = 2
return return_value
def apply_transformation(churn_df):
# convert pyspark df to pandas df
churn_df = change_data_types(churn_df)
# drop Geography column
churn_df = churn_df.drop('Geography', axis=1)
# encode values of Gender column
churn_df['Gender'] = list(map(encode_gender, churn_df['Gender']))
return churn_df
# define output schema
out_schema = StructType()
out_schema.add(StructField('CustomerId', IntegerType(), False))
out_schema.add(StructField('Surname', StringType(), False))
out_schema.add(StructField('CreditScore', IntegerType(), False))
out_schema.add(StructField('Gender', IntegerType(), False))
out_schema.add(StructField('Age', IntegerType(), False))
out_schema.add(StructField('Tenure', IntegerType(), False))
out_schema.add(StructField('Balance', DoubleType(), False))
out_schema.add(StructField('NumOfProducts', IntegerType(), False))
out_schema.add(StructField('HasCrCard', IntegerType(), False))
out_schema.add(StructField('IsActiveMember', IntegerType(), False))
out_schema.add(StructField('EstimatedSalary', DoubleType(), False))
out_schema.add(StructField('Exited', IntegerType(), False))
# convert python function to pandas udf
apply_transformation_udf = pandas_udf(apply_transformation, out_schema, PandasUDFType.GROUPED_MAP)
# use pandas udf to dataframe
df = df.groupby('NumOfProducts').apply(apply_transformation_udf)
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Geography | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | France | Male | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | Spain | Male | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | Germany | Male | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Spain | Male | 41 | 9 | 102827.44 | 64595.25 |
15809248 | Cole | 524 | France | Female | 36 | 10 | 0 | 109614.57 |
Output Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 1 | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | 1 | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | 1 | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | 1 | 41 | 9 | 102827.44 | 64595.25 |
15809248 | Cole | 524 | 2 | 36 | 10 | 0 | 109614.57 |
Make a POST web service call using Pyspark
Description
This script demonstrates how to make a post web service call.
Here, for each record in column Credit Score
, end point is hit using post method.
Record is passed as JSON data to post request.
Config Parameters:
url : end point url
Sample Code
def post_web_service_call(df, config_map):
"""
make as post web service call for each record of a column
:param df:
:param config_map:
:return:
"""
# imports
import requests
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, DoubleType
# get url from config map
url = config_map.get('url', '')
def post_call(value):
return_value = value
if url:
request_data = {'post_call_input': value}
# hit end point
r = requests.post(url, json=request_data)
# check status
if r.status_code == 200:
return_value = str(r.text)
return return_value
# as of now string value is return from udf.
# it can be change to IntegerType/DecimalType based on return type from udf
post_call_udf = udf(post_call, StringType())
# pass a required column to udf (here using CreditScore)
df = df.withColumn('Post_Call_Output', post_call_udf(df.CreditScore))
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | Male | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | Male | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | Male | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Male | 41 | 9 | 102827.44 | 64595.25 |
Output Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary | Post_Call_Output |
---|---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | Male | 34 | 9 | 0 | 6534.18 | Receive Credit Score : 413 |
15604348 | Allard | 710 | Male | 22 | 8 | 0 | 99645.04 | Receive Credit Score : 710 |
15693683 | Yuille | 814 | Male | 29 | 8 | 97086.4 | 197276.13 | Receive Credit Score : 814 |
15738721 | Graham | 773 | Male | 41 | 9 | 102827.44 | 64595.25 | Receive Credit Score : 773 |
Extract top-N records using Pyspark
Description
This script demonstrates how to select top n records from a dataframe.
Here, we are extracting top 50 employees based on their EstimatedSalary (derieved from column EstSalary
).
Sample Code
def extract_top_n_records(df, config_map):
"""
Extract top n records
:param df:
:param config_map:
:return:
"""
top_records_count = 50
df = df.sort(df.EstSalary.desc()).limit(top_records_count)
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | Male | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | Male | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | Male | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Male | 41 | 9 | 102827.44 | 64595.25 |
Output Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15616550 | Chidiebele | 698 | Male | 44 | 10 | 116363.37 | 198059.16 |
15693683 | Yuille | 814 | Male | 29 | 8 | 97086.4 | 197276.13 |
15641582 | Chibugo | 735 | Male | 43 | 10 | 123180.01 | 196673.28 |
15755196 | Lavine | 834 | Female | 49 | 2 | 131394.56 | 194365.76 |
Drop/Delete a column using Pyspark
Description
This script demonstrates how to drop a column from a dataframe.
Here, we are dropping the column Gender
from the input dataframe.
Sample Code
def drop_column(df, config_map):
"""
Drop a column
:param df:
:param config_map:
:return:
"""
df = df.drop('Gender')
return df
Example
Input Data Set
CustomerId | Surname | CreditScore | Gender | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | Male | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | Male | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | Male | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | Male | 41 | 9 | 102827.44 | 64595.25 |
Output Data Set
CustomerId | Surname | CreditScore | Age | Tenure | Balance | EstSalary |
---|---|---|---|---|---|---|
15633059 | Fanucci | 413 | 34 | 9 | 0 | 6534.18 |
15604348 | Allard | 710 | 22 | 8 | 0 | 99645.04 |
15693683 | Yuille | 814 | 29 | 8 | 97086.4 | 197276.13 |
15738721 | Graham | 773 | 41 | 9 | 102827.44 | 64595.25 |
Web Scraping using Python
Description
This script has a sample code snippet for scrapping data from a website. An input DataFrame contains URLs for different web pages to be scraped in a column named ‘url_path’. The Python processor navigates through each URL and extracts data from the web pages. The extracted data can be used for further processing.
Required Libraries: beautifulsoup4, requests
Sample Code
import requests
from bs4 import BeautifulSoup
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def get_web_page_content(df, config_map):
"""
parse url present in input df
:param df:
:param config_map:
:return:
"""
def process_url(url):
# Send a GET request to the URL
response = requests.get(url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the HTML content of the page
soup = BeautifulSoup(response.text, 'html.parser', preserve_whitespace_tags=["p"])
doc_content = soup.find('main', id='docsContent')
if doc_content is not None:
return doc_content.get_text()
else:
div_content = soup.find('div', class_='content')
return div_content.get_text()
# create udf
process_url_udf = udf(process_url, StringType())
df = df.withColumn('url_content', process_url_udf(df.url_path))
return df
Example
Input Data Set
url_path |
---|
https://docs.gathr.one/gathr-saas/docs/introduction/ |
https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/ |
https://docs.gathr.one/gathr-saas/docs/introduction/faqs/ |
Output Data Set
url_content | url_path |
---|---|
IntroductionAbout Gathr →Core Concepts →Free Trials →Security →FAQs → | https://docs.gathr.one/gathr-saas/docs/introduction/ |
HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up Supported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts → | https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/ |
HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr’s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr’s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr’s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr’s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to run in Gathr’s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users to utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!← SecurityUser Sign Up → | https://docs.gathr.one/gathr-saas/docs/introduction/faqs/ |
Text Chunking using Textwrap
Description
This script has sample code snippet for how to make the extracted array of chunks more readable and manageable. You can use a ’textwrap’ module to break it into smaller, well-defined chunks. This makes the data easier to process, analyze, or present. By chunking the output, you can isolate specific sections for further scrutiny, which is handy when dealing with a massive volume of data or relatable use cases.
Required Libraries: textwrap
The custom_text_splitter function splits the input text into chunks based on a specified separator (’\n’), chunk size (1500 characters), and overlap (200 characters). The chunks are then processed, and for each chunk, a dictionary is created with ’text’, ‘url’, and ‘id’ keys. The ‘id’ is generated based on the URL, and the resulting dictionaries are converted to JSON strings.
Sample Code
import json
import textwrap
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
def create_chunks_using_textwrap(df, config_map):
"""
create chunks of input text
:param df:
:param config_map:
:return:
"""
def create_chunks(input_value):
def custom_text_splitter(input_value, sep, chunk_size, chunk_overlap):
chunks = textwrap.wrap(input_value, width=chunk_size, expand_tabs=False)
if chunk_overlap > 0:
for i in range(1, len(chunks)):
chunks[i] = chunks[i - 1][-chunk_overlap:] + chunks[i]
return chunks
chunks = custom_text_splitter(input_value, sep="\n", chunk_size=1500, chunk_overlap=200)
new_data = []
for idx, chunk in enumerate(chunks):
dict = {
"text": chunk,
"inputValue": input_value,
}
new_data.append(json.dumps(dict))
return new_data
create_chunks_udf = udf(create_chunks, ArrayType(StringType()))
df = df.withColumn('chunks', create_chunks_udf(df.url_content))
return df
Example
Input Data Set
url_content | url_path |
---|---|
IntroductionAbout Gathr →Core Concepts →Free Trials →Security →FAQs → | https://docs.gathr.one/gathr-saas/docs/introduction/ |
HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up âSupported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts → | https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/ |
HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr’s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr’s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr’s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr’s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to run in Gathr’s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users to utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project â Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!← SecurityUser Sign Up → | https://docs.gathr.one/gathr-saas/docs/introduction/faqs/ |
Output Data Set
url_content | chunks | url_path |
---|---|---|
IntroductionAbout Gathr →Core Concepts →Free Trials →Security →FAQs → | [{“text”: “IntroductionAbout Gathr \u2192Core Concepts \u2192Free Trials \u2192Security \u2192FAQs \u2192”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction__0”}] | https://docs.gathr.one/gathr-saas/docs/introduction/ |
HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up âSupported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts → | [{“text”: “HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up \u00e2\u0086\u0092Supported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts \u2192”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_about_gathr__0”}] | https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/ |
HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr’s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr’s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr’s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr’s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to run in Gathr’s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users to utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project â Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!← SecurityUser Sign Up → | [{“text”: “HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr\u00e2\u0080\u0099s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr\u00e2\u0080\u0099s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr\u00e2\u0080\u0099s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr\u00e2\u0080\u0099s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/faqs/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_faqs__0”},{“text”: “nfigured to run in Gathr\u00e2\u0080\u0099s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured torun in Gathr\u00e2\u0080\u0099s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/faqs/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_faqs__1”},{“text”: “ow do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project usersto utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project \u00e2\u0086\u0092 Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!\u2190 SecurityUser Sign Up \u2192”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/faqs/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_faqs__2”}] | https://docs.gathr.one/gathr-saas/docs/introduction/faqs/ |
Parse docx, pptx, xlsx and Images
Description
This script provides a PySpark function for extracting text from base64-encoded binary content within a DataFrame, handling various document formats such as docx, pptx, xlsx and image types.
Required Libraries: docx2txt, python-pptx, openpyxl, pytesseract
Sample Code
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import base64
import io
import docx2txt
from pptx import Presentation
from PIL import Image
import pytesseract
from openpyxl import load_workbook
def get_text(df, config_map):
# Define the UDF
def extract_text_from_base64(base64_encoded_string):
# Decode base64-encoded data
decoded_data = base64.b64decode(base64_encoded_string)
# Wrap the decoded data in a BytesIO object for flexibility
byte_stream = io.BytesIO(decoded_data)
# Attempt to identify the file format
try:
# Try extracting text from DOCX
docx_text = docx2txt.process(byte_stream)
return " "+docx_text
except Exception:
pass
try:
# Try extracting text from PPTX
presentation = Presentation(byte_stream)
pptx_text = ""
for slide in presentation.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
pptx_text += shape.text + "\n"
return " "+pptx_text
except Exception:
pass
try:
# Load the workbook
workbook = load_workbook(byte_stream, read_only=True)
# Initialize an empty string to store extracted text
extracted_text = ""
# Iterate through all sheets in the workbook
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
# Iterate through all rows in the sheet
for row in sheet.iter_rows(values_only=True):
# Convert each cell value to string and append to the extracted text
row_text = ' '.join(str(cell) for cell in row if cell is not None)
extracted_text += row_text + '\n'
return " "+extracted_text
except Exception:
pass
try:
# Try extracting text from image using Tesseract OCR
image = Image.open(byte_stream)
image_text = pytesseract.image_to_string(image)
return " "+image_text
except Exception:
pass
return "Unknown format: Unable to extract text."
# Register the UDF with PySpark
extract_text_udf = udf(extract_text_from_base64, StringType())
# Apply the UDF using withColumn
df = df.withColumn('chunks', extract_text_udf(df.base64EncodedBinaryContent))
return df
Example
Input Data Set
path | modificationTime | length | base64EncodedBinaryContent |
---|---|---|---|
s3a://example/testfile.doc | 2024-01-09 09:03:10 | 250880 | 0M8R4KGxGuEAAAAAAAA AAAAAAAAAAAAAOwADAP7 |
Output Data Set
path | modificationTime | length | base64EncodedBinaryContent | output |
---|---|---|---|---|
s3a://example/testfile.doc | 2024-01-09 09:03:10 | 250880 | 0M8R4KGxGuEAAAAAAAAA AAAAAAAAAAAAOwADAP7 | This is Parsed Output |
If you have any feedback on Gathr documentation, please email us!