Python Processor

The Python processor allows you to perform following operations:

  • Write custom Python Spark code for defining transformations on Spark DataFrames.

  • Write custom Python code for processing input records at runtime.

Please email to Gathr Support to enable the Python Processor.

There are several Code Snippets that are available in the application and the same are explained in this topic to get you started with the Python processor.

Processor Configuration

Input Source: Input type to the custom Python function that will be called by Gathr.

The input provided to the custom python function will be a DataFrame. The custom function is expected to return a DataFrame after any processing.

Input Type: There are two ways for configuring the Python processor:

  • Inline: This option enables you to write Python code in the text editor. If selected, you will view one additional field Python Code.

    Python Code: Enables to write custom Python code directly on text editor.

  • Upload: This option enables you to upload single and multiple python scripts (.py files) and python packages (.egg/.zip files). You have to specify module name (should be part of uploaded files or package) and method name that will be called by the python processor.

    When you select Upload, UPLOAD FILE option appears on the screen, browse and select the files that need to be used in the python processor.

    One additional field, Import Module will also appear on the screen, if the Upload option is selected.

    Import Module: Specify module name which contains function that will be called by python processor. Here you will get list of all uploaded files in drop down list.

    The drop down list will show only .py files. You can also write a module name if it does not appear in the drop-down list.

Function Name: Name of the python function that is defined in Inline Python code or uploaded script.

Add Configuration: Enables to add Additional properties.

To pass configuration parameters in Python processor.

You can provide configuration parameters in Python processor in form of key value pair. These parameters will be available in form of dictionary in function given in Function Name field as second argument. So function given in field Function Name will take two arguments: (df, config_map)

Where first argument will be dataframe and second argument will be a dictionary that contains configuration parameters as key value pair.


Ask AI Assistant

Use the AI assistant feature to simplify the creation of Python code.

It allows you to generate complex Python code effortlessly, using natural language inputs as your guide.

Describe your desired expression in plain, conversational language. The AI assistant will understand your instructions and transform them into a functional Python code.

Tailor code to your specific requirements, whether it’s for data transformation, filtering, calculations, or any other processing task.

Note: Press Ctrl + Space to list input columns and Ctrl + Enter to submit your request.

Input Example:

Create a column called inactive_days by calculating difference between last_login_date and current date and give those records whose inactive_days is more than 60 days.


Notes

Optionally, enter notes in the Notes → tab and save the configuration.


Code Snippets

Described below are some sample Python code use cases:


Add a new column with constant value to existing dataframe using Pyspark

Description

This script demonstrates how to add a column with a constant value.

Here, adding a column name Constant_Column with string Constant_Value to an existing dataframe.

Sample Code

from pyspark.sql.functions import lit

def add_column_with_constant_value(df, config_map):
    """
    Add a column with constant value
    :param df:
    :param config_map:
    :return:
    """
    df = df.withColumn('Constant_Column', lit('Constant_Value'))
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGeographyAgeBalanceEstSalary
15633059Fanucci413France3406534.18
15604348Allard710Spain22099645.04
15693683Yuille814Germany2997086.4197276.13
15738721Graham773Spain41102827.4464595.25

Output Data Set

CustomerIdSurnameCreditScoreGeographyAgeBalanceEstSalaryConstant_Column
15633059Fanucci413France3406534.18Constant_Value
15604348Allard710Spain22099645.04Constant_Value
15693683Yuille814Germany2997086.4197276.13Constant_Value
15738721Graham773Spain41102827.4464595.25Constant_Value

Add a new column with random value to existing dataframe using Pyspark

Description

This script demonstrates how to add a column with random values.

Here, column Random_Column is added with random values generated using random function.

Sample Code

from pyspark.sql.functions import rand


def add_column_with_random_values(df, config_map):
    """
    Add column with random values to dataframe
    :param df:
    :param config_map:
    :return:
    """
    df = df.withColumn('Random_Column', rand())
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGeographyAgeBalanceEstSalary
15633059Fanucci413France3406534.18
15604348Allard710Spain22099645.04
15693683Yuille814Germany2997086.4197276.13
15738721Graham773Spain41102827.4464595.25

Output Data Set

CustomerIdSurnameCreditScoreGeographyAgeBalanceEstSalaryRandom_Column
15633059Fanucci413France3406534.180.0241309661
15604348Allard710Spain22099645.040.5138384557
15693683Yuille814Germany2997086.4197276.130.2652246569
15738721Graham773Spain41102827.4464595.250.8454138247

Add a new column using expression to existing dataframe using Pyspark

Description

This script demonstrates how to add a new column with existing columns.

Here, column Transformed_Column is added by multiplying columns EstSalary and Tenure.

Sample Code

def add_column_using_other_columns(df, config_map):
    """
    Add new column using existing column in a dataframe
    :param df:
    :param config_map:
    :return:
    """

    df = df.withColumn('Transformed_Column', (df.EstSalary * df.Tenure))
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreTenureAgeBalanceEstSalary
15633059Fanucci41393406534.18
15604348Allard710822099645.04
15693683Yuille81482997086.4197276.13
15738721Graham773941102827.4464595.25

Output Data Set

CustomerIdSurnameCreditScoreTenureAgeBalanceEstSalaryTransformed_Column
15633059Fanucci41393406534.1858807.62
15604348Allard710822099645.04797160.32
15693683Yuille81482997086.4197276.131578209.04
15738721Graham773941102827.4464595.25581357.25

Transform an existing column using Pyspark

Description

This script demonstrates how to transform a column.

Here, rounding off values of column Balance and converted to integer.

Sample Code

from pyspark.sql.functions import round


def transform_column(df, config_map):
    """
    transform a column
    :param df:
    :param config_map:
    :return:
    """
    df = df.withColumn('Balance', round(df.Balance).cast('integer'))
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreTenureAgeBalanceEstSalary
15633059Fanucci41393406534.18
15604348Allard710822099645.04
15693683Yuille81482997086.4197276.13
15738721Graham773941102827.4464595.25

Output Data Set

CustomerIdSurnameCreditScoreTenureAgeBalanceEstSalary
15633059Fanucci41393406534.18
15604348Allard710822099645.04
15693683Yuille81482997086197276.13
15738721Graham77394110282764595.25

Transform an existing column using udf

Description

This script demonstrates how to transform a column using pyspark udf.

Here, column Gender is encoded with integer value.

  1. Create a python function which encodes the values of column Gender to numaric values.

  2. Convert this method to pyspark udf.

  3. Apply this udf over column Gender and add encoded values to the same column.

Sample Code

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType


def transform_column(df, config_map):

    def encode_gender(input_value):
        """
        encode gender to numeric values
        :param input_value:
        :return:
        """
        return_value = None
        if input_value is not None:
            if input_value.lower() == 'male':
                return_value = 1
            elif input_value.lower() == 'female':
                return_value = 2
        return return_value

    # create udf
    encode_gender_udf = udf(encode_gender, IntegerType())
    # apply udf to 'Gender' column
    df = df.withColumn('Gender', encode_gender_udf(df.Gender))
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413Male34906534.18
15604348Allard710Male228099645.04
15693683Yuille814Male29897086.4197276.13
15738721Graham773Male419102827.4464595.25
15809248Cole524Female36100109614.57

Output Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413134906534.18
15604348Allard7101228099645.04
15693683Yuille814129897086.4197276.13
15738721Graham7731419102827.4464595.25
15809248Cole524236100109614.57

Apply transformation using pandas in batch pipeline

Description

This script demonstrates how to convert pyspark dataframe to pandas dataframe.

Here, the below approach is followed:

  1. First, convert pyspark dataframe to pandas data frame using pyspark’s toPandas function.

  2. Then, encode Gender column to numeric values (transformation).

  3. Now convert pandas dataframe to pyspark dataframe.

Required libraries:

  1. numpy

  2. pandas

Sample Code

def encode_gender(input_value):
    """
    encode gender to numeric values
    :param input_value:
    :return:
    """
    return_value = None
    if input_value is not None:
        if input_value.lower() == 'male':
            return_value = 1
        elif input_value.lower() == 'female':
            return_value = 2
    return return_value


def transform_using_pandas(df, config_map):
    # get sql context
    sql_context = df.sql_ctx

    # convert pyspark df to pandas df
    pandas_df = df.toPandas()

    # encode Gender into numeric values
    pandas_df['Gender'] = list(map(encode_gender, pandas_df['Gender']))

    # convert pandas dataframe to pyspark data frame
    df = sql_context.createDataFrame(pandas_df)

    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413Male34906534.18
15604348Allard710Male228099645.04
15693683Yuille814Male29897086.4197276.13
15738721Graham773Male419102827.4464595.25
15809248Cole524Female36100109614.57

Output Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413134906534.18
15604348Allard7101228099645.04
15693683Yuille814129897086.4197276.13
15738721Graham7731419102827.4464595.25
15809248Cole524236100109614.57

Apply transformation using pandas in streaming pipeline (using pandas udf)

Description

This script demonstrates how to apply transformation over pysarpk streaming dataframe using pandas.

Here, the below approach is followed:

  1. First, a python function is created which does pandas transformation.

  2. This function is converted into pandas udf of pyspark.

  3. Apply this udf to input dataframe and get desired output dataframe.

Required libraries:

  1. numpy

  2. pandas

Sample Code

# import modules
from pyspark.sql.types import DoubleType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType


def transform_using_pandas(df, config_map):

    def change_data_types(input_df):
        """
        change data types of columns of pandas dataframe
        :param input_df:
        :return:
        """
        input_df.CustomerId = input_df.CustomerId.apply(int)
        input_df.Surname = input_df.Surname.apply(str)
        input_df.CreditScore = input_df.CreditScore.apply(int)
        input_df.Geography = input_df.Geography.apply(str)
        input_df.Gender = input_df.Gender.apply(str)
        input_df.Age = input_df.Age.apply(int)
        input_df.Tenure = input_df.Tenure.apply(int)
        input_df.Balance = input_df.Balance.apply(float)
        input_df.NumOfProducts = input_df.NumOfProducts.apply(int)
        input_df.HasCrCard = input_df.HasCrCard.apply(int)
        input_df.IsActiveMember = input_df.IsActiveMember.apply(int)
        input_df.EstimatedSalary = input_df.EstimatedSalary.apply(float)
        input_df.Exited = input_df.Exited.apply(int)
        return input_df

    def encode_gender(input_value):
        """
        encode gender to numeric values
        :param input_value:
        :return:
        """
        return_value = None
        if input_value is not None:
            if input_value.lower() == 'male':
                return_value = 1
            elif input_value.lower() == 'female':
                return_value = 2
        return return_value

    def apply_transformation(churn_df):
        # convert pyspark df to pandas df
        churn_df = change_data_types(churn_df)
        # drop Geography column
        churn_df = churn_df.drop('Geography', axis=1)
        # encode values of Gender column
        churn_df['Gender'] = list(map(encode_gender, churn_df['Gender']))

        return churn_df

    # define output schema
    out_schema = StructType()
    out_schema.add(StructField('CustomerId', IntegerType(), False))
    out_schema.add(StructField('Surname', StringType(), False))
    out_schema.add(StructField('CreditScore', IntegerType(), False))
    out_schema.add(StructField('Gender', IntegerType(), False))
    out_schema.add(StructField('Age', IntegerType(), False))
    out_schema.add(StructField('Tenure', IntegerType(), False))
    out_schema.add(StructField('Balance', DoubleType(), False))
    out_schema.add(StructField('NumOfProducts', IntegerType(), False))
    out_schema.add(StructField('HasCrCard', IntegerType(), False))
    out_schema.add(StructField('IsActiveMember', IntegerType(), False))
    out_schema.add(StructField('EstimatedSalary', DoubleType(), False))
    out_schema.add(StructField('Exited', IntegerType(), False))

    # convert python function to pandas udf
    apply_transformation_udf = pandas_udf(apply_transformation, out_schema, PandasUDFType.GROUPED_MAP)

    # use pandas udf to dataframe
    df = df.groupby('NumOfProducts').apply(apply_transformation_udf)

    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGeographyGenderAgeTenureBalanceEstSalary
15633059Fanucci413FranceMale34906534.18
15604348Allard710SpainMale228099645.04
15693683Yuille814GermanyMale29897086.4197276.13
15738721Graham773SpainMale419102827.4464595.25
15809248Cole524FranceFemale36100109614.57

Output Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413134906534.18
15604348Allard7101228099645.04
15693683Yuille814129897086.4197276.13
15738721Graham7731419102827.4464595.25
15809248Cole524236100109614.57

Make a POST web service call using Pyspark

Description

This script demonstrates how to make a post web service call.

Here, for each record in column Credit Score, end point is hit using post method.

Record is passed as JSON data to post request.

Config Parameters:

url : end point url

Sample Code

def post_web_service_call(df, config_map):
    """
    make as post web service call for each record of a column
    :param df:
    :param config_map:
    :return:
    """
    # imports
    import requests
    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType, StringType, DoubleType

    # get url from config map
    url = config_map.get('url', '')

    def post_call(value):
        return_value = value
        if url:
            request_data = {'post_call_input': value}
            # hit end point
            r = requests.post(url, json=request_data)
            # check status
            if r.status_code == 200:
                return_value = str(r.text)
        return return_value

    # as of now string value is return from udf.
    # it can be change to IntegerType/DecimalType based on return type from udf
    post_call_udf = udf(post_call, StringType())

    # pass a required column to udf (here using CreditScore)
    df = df.withColumn('Post_Call_Output', post_call_udf(df.CreditScore))
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413Male34906534.18
15604348Allard710Male228099645.04
15693683Yuille814Male29897086.4197276.13
15738721Graham773Male419102827.4464595.25

Output Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalaryPost_Call_Output
15633059Fanucci413Male34906534.18Receive Credit Score : 413
15604348Allard710Male228099645.04Receive Credit Score : 710
15693683Yuille814Male29897086.4197276.13Receive Credit Score : 814
15738721Graham773Male419102827.4464595.25Receive Credit Score : 773

Extract top-N records using Pyspark

Description

This script demonstrates how to select top n records from a dataframe.

Here, we are extracting top 50 employees based on their EstimatedSalary (derieved from column EstSalary).

Sample Code

def extract_top_n_records(df, config_map):
    """
    Extract top n records
    :param df:
    :param config_map:
    :return:
    """
    top_records_count = 50
    df = df.sort(df.EstSalary.desc()).limit(top_records_count)
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413Male34906534.18
15604348Allard710Male228099645.04
15693683Yuille814Male29897086.4197276.13
15738721Graham773Male419102827.4464595.25

Output Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15616550Chidiebele698Male4410116363.37198059.16
15693683Yuille814Male29897086.4197276.13
15641582Chibugo735Male4310123180.01196673.28
15755196Lavine834Female492131394.56194365.76

Drop/Delete a column using Pyspark

Description

This script demonstrates how to drop a column from a dataframe.

Here, we are dropping the column Gender from the input dataframe.

Sample Code

def drop_column(df, config_map):
    """
    Drop a column
    :param df:
    :param config_map:
    :return:
    """
    df = df.drop('Gender')
    return df

Example

Input Data Set

CustomerIdSurnameCreditScoreGenderAgeTenureBalanceEstSalary
15633059Fanucci413Male34906534.18
15604348Allard710Male228099645.04
15693683Yuille814Male29897086.4197276.13
15738721Graham773Male419102827.4464595.25

Output Data Set

CustomerIdSurnameCreditScoreAgeTenureBalanceEstSalary
15633059Fanucci41334906534.18
15604348Allard710228099645.04
15693683Yuille81429897086.4197276.13
15738721Graham773419102827.4464595.25

Web Scraping using Python

Description

This script has a sample code snippet for scrapping data from a website. An input DataFrame contains URLs for different web pages to be scraped in a column named ‘url_path’. The Python processor navigates through each URL and extracts data from the web pages. The extracted data can be used for further processing.

Required Libraries: beautifulsoup4, requests

Sample Code

import requests
from bs4 import BeautifulSoup
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def get_web_page_content(df, config_map):
    """
    parse url present in input df
    :param df:
    :param config_map:
    :return:
    """
    def process_url(url):
        # Send a GET request to the URL
        response = requests.get(url)

        # Check if the request was successful (status code 200)
        if response.status_code == 200:
            # Parse the HTML content of the page
            soup = BeautifulSoup(response.text, 'html.parser', preserve_whitespace_tags=["p"])
            doc_content = soup.find('main', id='docsContent')
            if doc_content is not None:
                return doc_content.get_text()
            else:
                div_content = soup.find('div', class_='content')
                return div_content.get_text()

    # create udf
    process_url_udf = udf(process_url, StringType())
    df = df.withColumn('url_content', process_url_udf(df.url_path))
    return df

Example

Input Data Set

url_path
https://docs.gathr.one/gathr-saas/docs/introduction/
https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/
https://docs.gathr.one/gathr-saas/docs/introduction/faqs/

Output Data Set

url_contenturl_path
IntroductionAbout Gathr →Core Concepts →Free Trials →Security →FAQs →https://docs.gathr.one/gathr-saas/docs/introduction/
HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up Supported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts →https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/
HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr’s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr’s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr’s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr’s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to run in Gathr’s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users to utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!← SecurityUser Sign Up →https://docs.gathr.one/gathr-saas/docs/introduction/faqs/

Text Chunking using Textwrap

Description

This script has sample code snippet for how to make the extracted array of chunks more readable and manageable. You can use a ’textwrap’ module to break it into smaller, well-defined chunks. This makes the data easier to process, analyze, or present. By chunking the output, you can isolate specific sections for further scrutiny, which is handy when dealing with a massive volume of data or relatable use cases.

Required Libraries: textwrap

The custom_text_splitter function splits the input text into chunks based on a specified separator (’\n’), chunk size (1500 characters), and overlap (200 characters). The chunks are then processed, and for each chunk, a dictionary is created with ’text’, ‘url’, and ‘id’ keys. The ‘id’ is generated based on the URL, and the resulting dictionaries are converted to JSON strings.

Sample Code

import json
import textwrap

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType


def create_chunks_using_textwrap(df, config_map):
    """
    create chunks of input text
    :param df:
    :param config_map:
    :return:
    """

    def create_chunks(input_value):
        def custom_text_splitter(input_value, sep, chunk_size, chunk_overlap):
            chunks = textwrap.wrap(input_value, width=chunk_size, expand_tabs=False)
            if chunk_overlap > 0:
                for i in range(1, len(chunks)):
                    chunks[i] = chunks[i - 1][-chunk_overlap:] + chunks[i]
                return chunks

        chunks = custom_text_splitter(input_value, sep="\n", chunk_size=1500, chunk_overlap=200)
        new_data = []
        for idx, chunk in enumerate(chunks):
            dict = {
                "text": chunk,
                "inputValue": input_value,
            }
            new_data.append(json.dumps(dict))
        return new_data

    create_chunks_udf = udf(create_chunks, ArrayType(StringType()))
    df = df.withColumn('chunks', create_chunks_udf(df.url_content))
    return df

Example

Input Data Set

url_contenturl_path
IntroductionAbout Gathr →Core Concepts →Free Trials →Security →FAQs →https://docs.gathr.one/gathr-saas/docs/introduction/
HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up →Supported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts →https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/
HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr’s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr’s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr’s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr’s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to run in Gathr’s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users to utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project → Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!← SecurityUser Sign Up →https://docs.gathr.one/gathr-saas/docs/introduction/faqs/

Output Data Set

url_contentchunksurl_path
IntroductionAbout Gathr →Core Concepts →Free Trials →Security →FAQs →[{“text”: “IntroductionAbout Gathr \u2192Core Concepts \u2192Free Trials \u2192Security \u2192FAQs \u2192”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction__0”}]https://docs.gathr.one/gathr-saas/docs/introduction/
HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up →Supported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts →[{“text”: “HomeDocsIntroductionAbout GathrAbout Gathr In this article ExpandCollapseSupported BrowsersIn this articleSupported BrowsersWelcome to Gathr! An enterprise grade Ingestion, ETL and analytics platform for data processing. This platform is based on the best of breed open-source technologies and supports the end-to-end functionality of data ingestion, enrichment, machine learning, action triggers and visualization. Leverage the intuitive visual interface to develop and deploy data ingestion and Change Data Capture (CDC) applications up to ten times faster than any other platform, across industries, dataformats, and use cases.As it is a SaaS offering there are absolutely no prerequisites related to installation and setup.You can simply get registered to utilize template solutions that are offered by Gathr and customized to your data ingestion, changed data capture and ETL pipeline needs.For more details about how to set up a Gathr account, see User Sign Up \u00e2\u0086\u0092Supported Browsers #Gathr is currently certified to be used with Google Chrome and Safari browsers.If you have any feedback on Gathr documentation, please email us!Core Concepts \u2192”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_about_gathr__0”}]https://docs.gathr.one/gathr-saas/docs/introduction/about_gathr/
HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr’s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr’s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr’s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr’s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to run in Gathr’s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users to utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project → Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!← SecurityUser Sign Up →[{“text”: “HomeDocsIntroductionFAQsFAQs In this article ExpandCollapseAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsIn this articleAWS Compute Environments FAQsAzure Databricks Compute Environments FAQsSome most common questions about various Gathr related features have been answered in this section.AWS Compute Environments FAQs #Q: How can I connect from the AWS region which is not listed in the Compute Setup?In case if the preferred region where you want to host the compute environments is not supported by Gathr, please write to our support team at:saas-support@gathr.oneAlternatively, you can use VPC peering for clusters in this region.Q: Can I switch an Ingestion or CDC Application configured to run in Gathr\u00e2\u0080\u0099s account to run in my account?Yes. A pipeline created with tunnel/upload file feature for source or target, configured to run in Gathr\u00e2\u0080\u0099s account can be reconfigured to run in your account.Prerequisite: Source/target and EMR compute should be configured in the same region, else (optional) you can create VPC peering between regions in your account.Q: Can I switch an Ingestion or CDC Application configured to run in my account to run in Gathr\u00e2\u0080\u0099s account?Yes. A pipeline created with tunnel for source or target, configured to run in your account can be reconfigured to run in Gathr\u00e2\u0080\u0099s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured to”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/faqs/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_faqs__0”},{“text”: “nfigured to run in Gathr\u00e2\u0080\u0099s account in the same region.No. A pipeline created with upload file feature for source or target configured to run in your account during creation cannot be reconfigured torun in Gathr\u00e2\u0080\u0099s account.Q: Will I be charged differently if I am running pipelines in my Account?Yes. Please contact our support team for more details.saas-support@gathr.oneAzure Databricks Compute Environments FAQs #Q: What costs are associated with using Databricks in Gathr? The costs primarily consist of the infrastructure and cloud-related expenses incurred on your Databricks account. This includes the charges levied by Databricks for the resources consumed during the active period of your clusters. Gathr, however, does not impose any additional charges for designing or deploying pipelines within the platform. Q: Are there any additional charges from Gathr for using Databricks? You will be only charged for the credit points consumed during application execution. Gathr does not impose any supplementary charges for designing or deploying applications on Databricks. Q: How can I register my Azure Databricks account with Gathr? To register your Azure Databricks account with Gathr, navigate to your User Profile > Compute Setup tab, click on ADD CLOUD SERVICE ACCOUNT, select Azure Databricks as the Account Type, and provide the required details, including Account Name, Instance URL, Access Token, and DBFS Gathr Metadata Repository Path. Then simply save the account details. Q: How do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project users”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/faqs/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_faqs__1”},{“text”: “ow do I link a compute environment to a project after registering a Databricks account? Administrators can link registered compute accounts to specific projects within Gathr. This allows project usersto utilize the registered clusters for deploying their applications. For detailed steps, refer to Link Compute Environment to Project \u00e2\u0086\u0092 Q: What is the difference between the Gathr and Databricks engines in Gathr? The Gathr engine is the default computing environment for designing and deploying applications within the Gathr platform. The Databricks engine, when registered, enables users to use clusters that can be launched from Gathr on their Databricks workspaces. Q: Can I switch between the Gathr and Databricks engines at any time? Yes, users can switch between the Gathr and Databricks engines based on their specific requirements. However, switching is not allowed while actively designing applications within the interface.If you have any feedback on Gathr documentation, please email us!\u2190 SecurityUser Sign Up \u2192”, “url”: “https://docs.gathr.one/gathr-saas/docs/introduction/faqs/”, “id”: “https_docs.gathr.one_gathr-saas_docs_introduction_faqs__2”}]https://docs.gathr.one/gathr-saas/docs/introduction/faqs/

Parse docx, pptx, xlsx and Images

Description

This script provides a PySpark function for extracting text from base64-encoded binary content within a DataFrame, handling various document formats such as docx, pptx, xlsx and image types.

Required Libraries: docx2txt, python-pptx, openpyxl, pytesseract

Sample Code

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import base64
import io
import docx2txt
from pptx import Presentation
from PIL import Image
import pytesseract
from openpyxl import load_workbook

def get_text(df, config_map):
    # Define the UDF
    def extract_text_from_base64(base64_encoded_string):
        # Decode base64-encoded data
        decoded_data = base64.b64decode(base64_encoded_string)
        # Wrap the decoded data in a BytesIO object for flexibility
        byte_stream = io.BytesIO(decoded_data)
        # Attempt to identify the file format
        try:
            # Try extracting text from DOCX
            docx_text = docx2txt.process(byte_stream)
            return " "+docx_text
        except Exception:
            pass
        try:
            # Try extracting text from PPTX
            presentation = Presentation(byte_stream)
            pptx_text = ""
            for slide in presentation.slides:
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        pptx_text += shape.text + "\n"
            return " "+pptx_text
        except Exception:
            pass
        try:
            # Load the workbook
            workbook = load_workbook(byte_stream, read_only=True)
            # Initialize an empty string to store extracted text
            extracted_text = ""
            # Iterate through all sheets in the workbook
            for sheet_name in workbook.sheetnames:
                sheet = workbook[sheet_name]
                # Iterate through all rows in the sheet
                for row in sheet.iter_rows(values_only=True):
                    # Convert each cell value to string and append to the extracted text
                    row_text = ' '.join(str(cell) for cell in row if cell is not None)
                    extracted_text += row_text + '\n'
            return " "+extracted_text
        except Exception:
            pass

        try:
            # Try extracting text from image using Tesseract OCR
            image = Image.open(byte_stream)
            image_text = pytesseract.image_to_string(image)
            return " "+image_text
        except Exception:
            pass
        return "Unknown format: Unable to extract text."
    # Register the UDF with PySpark
    extract_text_udf = udf(extract_text_from_base64, StringType())
    # Apply the UDF using withColumn
    df = df.withColumn('chunks', extract_text_udf(df.base64EncodedBinaryContent))
    return df

Example

Input Data Set

Certainly! Here’s the provided information formatted as a Markdown table:

pathmodificationTimelengthbase64EncodedBinaryContent
s3a://example/testfile.doc2024-01-09 09:03:102508800M8R4KGxGuEAAAAAAAA
AAAAAAAAAAAAAOwADAP7

Output Data Set

pathmodificationTimelengthbase64EncodedBinaryContentoutput
s3a://example/testfile.doc2024-01-09 09:03:102508800M8R4KGxGuEAAAAAAAAA
AAAAAAAAAAAAOwADAP7
This is Parsed Output
Top