Jobs

The implementation logic of performing the Ingestion is contained within a Job. Jobs have 2 different types

  • Batch Jobs - Executes at set intervals according to a schedule and/or by a Trigger Function fetching a bounded amount of data with each execution.

  • Streaming Jobs - Executes continuously and will stream data as it changes/arrives fetching an unbounded amount of data.

Batch Jobs

Batch Jobs are Ingestion processes that fetch a deterministic amount of data from a source at a scheduled time, or by a defined Trigger.

The BatchJob Class

The BatchJob is an abstract class that provides the basic Batch behaviours expecting the implementing class to provide the necessary logic to do the fetching.

The following code fragment provides the basic structure of a BatchJob:

@Slf4j  
public class CrytpoBatchJob extends BatchJob { //1. 

    //2. 
    @Override
    public IngestionResult ingest(IngestionRequest request) throws NoResultsFoundException {
        //... Logic to fetch

        //Result Set 
        DataResult dr =  Sql.query("jdbc://host/db", "select * from blah");
        
        dr =  Transform.with(dr)
          .onRow(
              map((dr)=> {}),
              pipe((dr)= {})
          )
          .chunk(500)
          .to(
             csv(
              Options...
             ),
             compress(
               Options...
             )
          )
        
 
        
        Store.save("s3://bucket/asdf", drCsv);
        Store.save("kafka://host/some_topic", ...);
        
        Store.save("https://storage.googleapis.com/bucket", ...);
        
       //3.         
       return newIngestionResult(
            newOffsetKey(OffsetEntry.of("updatedAt", "2021-01-01T01:02:33.312Z")))
        .build();
    }
}
  1. BatchJob - The class extends the BatchJob class which forces us to implement the ingest method

  2. IngestionRequest - This is where the work for fetching the data happens.

    • The IngestionRequest provides the needed context which includes, Parameters, Offsets, Schedule details etc.

    • This method is invoked by the Ingestr Framework as required by the defined Schedules and/or Triggers

  3. IngestionResult - The result of the Ingestion processing which contains the data that is needed and the offset

IngestionRequest

The Ingestion Request contains all the relevant contextual information required for the Loader to perform its function. The Ingestr Framework has the responsibility to create this request object based on the Schedule, Trigger and if a reprocessing task has been initiated.

The class contains the following Parameters

Ingestion Result

Schedule

The schedule determines when a batch job should be invoked and is defined by:

  • identifier/name - The unique identifier and name that should appear for this schedule

  • cron - Quartz style cron expression (with seconds granularity) e.g. "0 * * ? * *" would run every minute at 0 seconds past the minute.

  • timezone - the TimeZone as validated by java.time.ZoneId.of("?") the execution to run in (e.g. "UTC")

  • parameters - The list of defined parameter fields that the BatchJob class implementation expect to receive that are especially relevant when executing on this schedule

  • partition filters - A list of filters that will be applied when determining which Data Partitions belong to this Schedule. (default: empty list which implies all Data Partitions)

StreamingJob

The StreamingJob is an abstract class that provides the Streaming behaviours needed when implementing a Streaming Loader

  • StreamingJob - The class extends the StreamingJob class which forces us to implement the ingest method with a Streaming Context

  • ingest() - This is where the work for fetching the data happens.

    • The StreamingJob will run in a dedicated thread and it's expected that the method would run continuously in a while loop until its shutdown

  • IngestionResult - The result of the Ingestion processing which contains the data that is needed and the offset

    • The StreamingJobContext has an emitResult() method which should be used as data ingestion is required

Last updated