Descriptor

A Descriptor acts as a kind of 'template' for Data Sets. It contains properties and shared behaviours shared amongst all Data Sets associated with the Descriptor. A Data Set must be associated with exactly 1 Descriptor.

The Descriptor describes the following properties and behaviours:

  • Topic - The name of the Kafka Topic this data should be ingested into

  • Data Set Key - The set of keys (1 or more) used to uniquely identify a data partition e.g. a Table Name or Data Set Identifiers

  • Offset Key - For resumable data sets, the set of keys (1 or more) used to mark where the data processing can resume from on its next execution

  • Parameters - A set of configuration parameters that are supplied to the Ingestion Job on execution

  • Data Set Discoverer - A function that can dynamically discover new Data Sets from a source system that are eligible for ingestion

  • Partitioner - For large Data Sets, defines how data can be split for ingestion

  • Reprocessor - A function that is executed to define a set of Offsets that need to be reprocessed

Definition

Below is the basic definition configuring a new Data Descriptor

Ingestr.build() 
// ... (other definitions)

  //1. Data Descriptor
  .dataDescriptor(newDataDescriptor("crypto-kline", "Crypto- Kline")
  	//2. Topic
	.topic("kline", 2)
	
	//3. Partitions
	.addPartitionKey(
		newPartitionKey("currencyPair", FieldType.SINGLE, DataType.STRING)
			.label("Currency Pair"))
	.addPartitionKey(
		newPartitionKey("resolution", FieldType.SINGLE, DataType.STRING)
			.label("Resolution"))
			
	//4. Offsets
	.addOffsetKey(newOffsetKey("lastUpdate", DataType.TIMESTAMP))
	
	//5. Parameters
	.addParameter(
		newParameter("window", FieldType.SINGLE, DataType.INTEGER)
		   .defaultValue("1")
	)
	
	//6.
	.partitionRegistrator(
		newPartitionRegister(CryptoPartitionRegistrator::new)
			.schedule("0/20 * * ? * *", "UTC")
			.deregistrationMethod(DeregistrationMethod.DEREGISTER)
	)
	
	//7.
	.reprocessor(
		newReprocessor(CryptoReprocessor::new)
			.parameterDescriptor(
				newParameter("from", FieldType.SINGLE, DataType.DATE)
				  .label("From")
				  .build()
		)
		.parameterDescriptor(
			newParameter("to", FieldType.SINGLE, DataType.DATE)
				.label("To")
				.build()
		)
	)
)

Last updated

Was this helpful?