Skip to content

Version Tracking

The VersionTrackerStep is a common step in the SDK method as other steps. Upon a successfully processed batch, the VersionTrackerStep will call the trait implementation of save_processor_status().

The ProcessorStatusSaver trait requires the implementation of the method save_processor_status with the following signature:

async fn save_processor_status(
&self,
last_success_batch: &TransactionContext<()>,
) -> Result<(), ProcessorError>;

This method is where checkpointing should be written. If you’re writing to Postgres, you can use the SDK’s Postgres implementation here. It is possible to checkpoint progress in different ways by using enums. The SDK’s Postgres implementation inserts using a simple processor_status model.

Now that the processor successfully writes to the chosen store for version tracking, upon restarting it needs to retrieve the latest successful version from that store. Here is an example of a get_starting_version() method that returns the latest processed version saved. This starting_version: u64 can then be used as below. If there is no checkpoint, the processor will start from the beginning of the chain.

let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;

The SDK does not provide an implementation of ProcessorStatusSaver that will save backfill progress. To enable saving backfill progress, IndexerProcessorConfig, ProcessorStatusSaver and get_starting_version() need some updates. Without these changes, it is difficult to run a live processor at the latest transaction version as well as a backfill processor.

Add an additional field on your IndexerProcessorConfig for a BackfillConfig. In this implementation, the BackfillConfig is part of an enum ProcessorMode that is used to determine the mode the processor is running in. In backfill mode, the processor starts from a different version and the progress is saved in a separate table.

Add the backfill_config section to server_config in your yaml file to set backfill_alias. Example

Use a separate table for backfill processor status to avoid write conflicts. This table (backfill_processor_status_table) uses backfill_alias as the primary key instead of processor_name to prevent conflicts with the main processor_status table when running head and backfill processors concurrently. Create multiple backfill processors with differing backfill_alias and transaction version ranges for a faster backfill. Expand on this implementation. This model introduces a new state, BackfillStatus, which is either InProgress or Complete which will determine the backfilling restart behavior.

Expand your ProcessorStatusSaver implementation to include a Backfill variant that extracts the backfill_alias from the BackfillConfig, and the backfill_start_version backfill_end_version from IndexerProcessorConfig.transaction_stream_config like this. Update the corresponding write query to write to the new backfill_processor_status table.

Add a statement in your get_starting_version method to query the backfill_processor_status_table when the BackfillConfig field is present in IndexerProcessorConfig .