Elasticsearch Emitter

Elasticsearch emitter allows you to store data in Elasticsearch indexes.

Elasticsearch Emitter Configuration

To add an Elasticsearch emitter to your pipeline, drag the emitter onto the canvas and connect it to a Data Source or processor. The configuration settings are as follows:

FieldDescription
Connection NameAll Elasticsearch connections will be listed here.Select a connection for connecting to Elasticsearch.
Output MessageOutput message which is to be indexed.
Index Nested JSON

Select the checkbox if nested JSON fields are to be indexed.

If the checkbox is not selected, three additional fields are populated: Index Number of Shards, Index Replication Factor and Output Fields.

If selected, these three fields will be hidden and following note be displayed.

“Index will be created with ElasticSearch default 5 shards and 1 replication factor.”

Index Number of ShardsNumber of shards to be created in Index Store.
Index Replication FactorNumber of additional copies of data.
Index Name

Index name can also be created using the document/data field like emp_{document_field}.

Here{document_field} will be replaced during runtime by the value of document field of that particular record.

Index name should be in lower case and follow naming conventions of Elasticsearch.

Specify the index name where data is to be indexed.

Verify dynamic index creation works only when below condition is fulfilled.

1. Index Nested JSON check box should be selected on ES emitter.

2. Action.auto_create_index: true should be set in Elasticsearch cluster.

3. The field data should always be in lower case otherwise pipeline would fail.

Index Type

Index Type could be either Static or Dynamic.

Example of Dynamic Index Type:

Index type can also be created using the document/data field like emp_{document_field}.

Here {document_field} will be replaced during runtime by the value of document field of that record.

Notes:

1. Default Index type or blank index type will be

index name +“_ type”.

2. Index type should follow naming conventions of elasticsearch.

3. Make sure that in case of a static index name, index type should also be static.

Checkpoint Storage LocationSelect the checkpointing storage location. Available options are HDFS, S3, and EFS.
Checkpoint ConnectionsSelect the connection. Connections are listed corresponding to the selected storage location.
Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.

For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-based Check PointSelect checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.
ID Field NameSpecify a name for the generated ID field.
ID Generator Type

Enables to generate the ID field.

Following type of ID generators are available:

UUID: Universally unique identifier.

Field Values based: In this case, ID is generated by appending the values of selected fields.

If you select this option then an additional field – “Key Fields” will be displayed, where you need to select the fields you want to combine. The fields will be appended in the same order as selected on the user interface.

Custom: In this case, you can write your custom logic to create the ID field. For example, if you wish to use an UUID key but want to prefix it with “HSBC”, then you can write the logic in a Java class.

If you select this option then an additional field - “Class Name” will be displayed on user interface where you need to mention the fully qualified class name of your Java class.

You can download the sample project from the “Data Pipeline” landing page and refer Java class com.yourcompany.custom.keygen.SampleKeyGenerator to write the custom code.

Emitter Output FieldsOutput fields of the emitter.
Connection RetriesNumber of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries.
Output Mode

Output mode to be used while writing the data to Streaming sink.

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable TriggerTrigger defines how frequently a streaming query should be executed.
Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATIONEnables additional configuration properties of Elasticsearch.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Top