Version Tracking, Restart Behavior, and Backfilling
Version Tracking
The VersionTrackerStep
is a common step in the SDK that checkpoints indexer progress. The struct requires implementation of the ProcessorStatusSaver
trait and can be used with the same .connect_to()
method as other steps. Upon a successfully processed batch, the VersionTrackerStep
will call the trait implementation of save_processor_status()
.
ProcessorStatusSaver
The ProcessorStatusSaver
trait requires the implementation of the method save_processor_status
with the following signature:
async fn save_processor_status(
&self,
last_success_batch: &TransactionContext<()>,
) -> Result<(), ProcessorError>;
This method is where checkpointing should be written. Examples can be found here. It is possible to checkpoint progress in different ways by using enums. This example inserts into Postgres using a simple processor_status
model.
Restart Behavior
Now that the processor successfully writes to the chosen store for version tracking, upon restarting it needs to retrieve the latest successful version from that store. Here is an example of a get_starting_version()
method that returns the latest processed version saved. This starting_version: u64
can then be used as below. If there is no checkpoint, the processor will start from the beginning of the chain.
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
Backfilling
To enable backfilling, IndexerProcessorConfig
, ProcessorStatusSaver
and get_starting_version()
need some updates. Without these changes, it is difficult to run a live processor at the latest transaction version as well as a backfill processor.
Updates to Config
Add an additional field on your IndexerProcessorConfig
for a BackfillConfig
which contains a single field, backfill_alias
. The introduction of this field will be used to determine whether the processor is a backfill processor or regular one.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerProcessorConfig {
pub processor_config: ProcessorConfig,
pub transaction_stream_config: TransactionStreamConfig,
pub db_config: DbConfig,
pub backfill_config: Option<BackfillConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackfillConfig {
pub backfill_alias: String,
}
Updates to config.yaml
Add the backfill_config
section to server_config
in your yaml file to set backfill_alias
.
server_config:
processor_config:
type: "events_processor"
...
backfill_config:
backfill_alias: "events_processor_backfill"
Backfill Processor Status Table
Use a separate table for backfill processor status to avoid write conflicts. This table (backfill_processor_status_table
) uses backfill_alias
as the primary key instead of processor_name
to prevent conflicts with the main processor_status
table when running head and backfill processors concurrently. Create multiple backfill processors with differing backfill_alias
and transaction version ranges for a faster backfill.
Expand on this implementation. This model introduces a new state, BackfillStatus
, which is either InProgress
or Complete
which will determine the backfilling restart behavior.
Updates to ProcessorStatusSaver
Expand your ProcessorStatusSaverEnum
to include a Backfill
variant that extracts the backfill_alias
from the BackfillConfig
, and the backfill_start_version
backfill_end_version
from IndexerProcessorConfig.transaction_stream_config
like this. Update the corresponding write query to write to the new backfill_processor_status
table.
Updates to get_starting_version
Add a statement in your get_starting_version
method to query the backfill_processor_status_table
when the BackfillConfig
field is present in IndexerProcessorConfig
.