Jobs
The implementation logic of performing the Ingestion is contained within a Job. Jobs have 2 different types
Batch Jobs - Executes at set intervals according to a schedule and/or by a Trigger Function fetching a bounded amount of data with each execution.
Streaming Jobs - Executes continuously and will stream data as it changes/arrives fetching an unbounded amount of data.
Batch Jobs
Batch Jobs are Ingestion processes that fetch a deterministic amount of data from a source at a scheduled time, or by a defined Trigger.
The BatchJob Class
The BatchJob is an abstract class that provides the basic Batch behaviours expecting the implementing class to provide the necessary logic to do the fetching.
The following code fragment provides the basic structure of a BatchJob:
@Slf4j
public class CrytpoBatchJob extends BatchJob { //1.
//2.
@Override
public IngestionResult ingest(IngestionRequest request) throws NoResultsFoundException {
//... Logic to fetch
//Result Set
DataResult dr = Sql.query("jdbc://host/db", "select * from blah");
dr = Transform.with(dr)
.onRow(
map((dr)=> {}),
pipe((dr)= {})
)
.chunk(500)
.to(
csv(
Options...
),
compress(
Options...
)
)
Store.save("s3://bucket/asdf", drCsv);
Store.save("kafka://host/some_topic", ...);
Store.save("https://storage.googleapis.com/bucket", ...);
//3.
return newIngestionResult(
newOffsetKey(OffsetEntry.of("updatedAt", "2021-01-01T01:02:33.312Z")))
.build();
}
}BatchJob - The class extends the BatchJob class which forces us to implement the ingest method
IngestionRequest - This is where the work for fetching the data happens.
The IngestionRequest provides the needed context which includes, Parameters, Offsets, Schedule details etc.
This method is invoked by the Ingestr Framework as required by the defined Schedules and/or Triggers
IngestionResult - The result of the Ingestion processing which contains the data that is needed and the offset
IngestionRequest
The Ingestion Request contains all the relevant contextual information required for the Loader to perform its function. The Ingestr Framework has the responsibility to create this request object based on the Schedule, Trigger and if a reprocessing task has been initiated.
The class contains the following Parameters
public class IngestionRequest {
/**
* If this ingestion was triggered by scheduling, this will be configured with the schedule that triggered it
*/
private Optional<IngestionSchedule> schedule;
/**
* The full definition of the originating Ingestion behind this request
*/
private Ingestion ingestion;
/**
* The full definition of the originating DataDescriptor behind this request
*/
private DataDescriptor dataDescriptor;
/**
* The Data Partition that is the subject of this request and the target of the Source
*/
private Partition partition;
/**
* (Optional) The previous offset that was stored of the last successful Ingestion that took place
*/
private Optional<Offset> lastOffset;
/**
* The set of Parameters and associated values after being merged
*/
private Parameters parameters;
/**
* Any arbitrary properties that may have been provided at execution time
*/
private Map<String, String> properties = new HashMap<>();
}Ingestion Result
public class IngestionResult {
@NonNull
@Builder.Default
private final ZonedDateTime executionTimestamp = ZonedDateTime.now();
/**
* (Mandatory) Returns the offset of where we are up to with the data processing (inclusive)
*/
private Offset offset;
/**
* (Optional) The InputStream containing the excessive payload. If Data Storage is configured, then the
* loader will automatically store in the Data Storage, and send a kafka topic message pointing the location;
*/
private Optional<InputStream> rawLargeData;
/**
* (Optional) The Raw String payload to be sent to kafka.
* Either rawLargeData or data object must be set
*/
private Optional<String> data;
/**
* Specify any additional "Meta-Data" that should accompany the data collected which will be attached to the header
* information of the kafka messages
*/
@Singular(value = "meta")
private Map<String, String> meta;
/**
* (Optional) Specify the key of this record as it goes into the Kafka Topic. This will influence the partitioning
* of messages into the queue. If this is not set, then null (random) topic assignment will be used.
*/
private Optional<String> key;
}Schedule
The schedule determines when a batch job should be invoked and is defined by:
identifier/name - The unique identifier and name that should appear for this schedule
cron - Quartz style cron expression (with seconds granularity) e.g. "0 * * ? * *" would run every minute at 0 seconds past the minute.
timezone - the TimeZone as validated by java.time.ZoneId.of("?") the execution to run in (e.g. "UTC")
parameters - The list of defined parameter fields that the BatchJob class implementation expect to receive that are especially relevant when executing on this schedule
partition filters - A list of filters that will be applied when determining which Data Partitions belong to this Schedule. (default: empty list which implies all Data Partitions)
StreamingJob
The StreamingJob is an abstract class that provides the Streaming behaviours needed when implementing a Streaming Loader
@Slf4j
public class CrytpoStreamingJob extends StreamingJob { //1.
//2.
@Override
public void ingest(StreamingJobContext context) throws NoResultsFoundException {
while (isRunning()) {
try {
Thread.sleep(100);
} catch (Exception e) {}
//... Logic to fetch
//3.
context.emitResult(newIngestionResult(request,
"<some-data>",
newOffsetKey(OffsetEntry.of("updatedAt", "2021-01-01T01:02:33.312Z")))
.build()
);
}
}
}StreamingJob - The class extends the StreamingJob class which forces us to implement the ingest method with a Streaming Context
ingest() - This is where the work for fetching the data happens.
The StreamingJob will run in a dedicated thread and it's expected that the method would run continuously in a while loop until its shutdown
IngestionResult - The result of the Ingestion processing which contains the data that is needed and the offset
The StreamingJobContext has an emitResult() method which should be used as data ingestion is required
Last updated
Was this helpful?