Batch Emitter

Batch emitter allows you to write your own custom code for processing batch data as per the logic written in the custom emitter.

In case, you want to use any other emitter which is not provided by Gathr, you can make use of this emitter.

For example, if you want to store data to HDFS, you can write your own custom code and store the data.

Batch Emitter Configuration

To add a batch custom emitter to your pipeline, drag the custom emitter onto the canvas, connect it to a Data Source or processor, and right-click on it to configure.

FieldDescription
Emitter Plugin

Provide fully qualified class name which implements Gathr CustomSSEmitter interface.

You can download sample project from Gathr UI (Data Pipeline page) and refer SampleCustomEmitter class.

Checkpoint Storage LocationSelect the checkpointing storage location. Available options are HDFS, S3, and EFS.
Checkpoint ConnectionsSelect the connection. Connections are listed corresponding to the selected storage location.
Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.

For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-based Check PointSelect checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.
Enable TriggerTrigger defines how frequently a streaming query should be executed.
ADD CONFIGURATIONEnables to configure additional properties.

Here is a small snippet of sample code that is used for writing data to HDFS.

public class CustomSinkProvider implements StreamSinkProvider, DataSourceRegister { 

public Sink createSink(SQLContext sqlContext, Map<String, String> options, Seq<String> partitionCols, OutputMode outputMode) {

return new MySink("mysink1");
}

public String shortName() {
return "mysink";
}

}

public class MySink implements Sink {

String message;

public MySink(String msg) {
message = msg;
}

public void addBatch(long batchId, Dataset<Row> dataset) {
ds.write().format("json").option("path", "localhost:/user/sax/blankTest/custom/data").save();

}

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Top