Batch Emitter
In this article
Batch emitter allows you to write your own custom code for processing batch data as per the logic written in the custom emitter.
In case, you want to use any other emitter which is not provided by Gathr, you can make use of this emitter.
For example, if you want to store data to HDFS, you can write your own custom code and store the data.
Batch Emitter Configuration
To add a batch custom emitter to your pipeline, drag the custom emitter onto the canvas, connect it to a Data Source or processor, and right-click on it to configure.
Field | Description |
---|---|
Emitter Plugin | Provide fully qualified class name which implements Gathr CustomSSEmitter interface. You can download sample project from Gathr UI (Data Pipeline page) and refer |
Checkpoint Storage Location | Select the checkpointing storage location. Available options are HDFS, S3, and EFS. |
Checkpoint Connections | Select the connection. Connections are listed corresponding to the selected storage location. |
Checkpoint Directory | It is the path where Spark Application stores the checkpointing data. For HDFS and EFS, enter the relative path like For S3, enter an absolute path like: |
Time-based Check Point | Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis. |
Enable Trigger | Trigger defines how frequently a streaming query should be executed. |
ADD CONFIGURATION | Enables to configure additional properties. |
Here is a small snippet of sample code that is used for writing data to HDFS.
public class CustomSinkProvider implements StreamSinkProvider, DataSourceRegister {
public Sink createSink(SQLContext sqlContext, Map<String, String> options, Seq<String> partitionCols, OutputMode outputMode) {
return new MySink("mysink1");
}
public String shortName() {
return "mysink";
}
}
public class MySink implements Sink {
String message;
public MySink(String msg) {
message = msg;
}
public void addBatch(long batchId, Dataset<Row> dataset) {
ds.write().format("json").option("path", "localhost:/user/sax/blankTest/custom/data").save();
}
Click on the Next button. Enter the notes in the space provided.
Click on the DONE button for saving the configuration.
If you have any feedback on Gathr documentation, please email us!