Saltearse al contenido

Seguimiento de Versión

El VersionTrackerStep es un paso común en el SDK método como otros pasos. Tras un lote procesado exitosamente, el VersionTrackerStep llamará la implementación del trait de save_processor_status().

El trait ProcessorStatusSaver requiere la implementación del método save_processor_status con la siguiente firma:

async fn save_processor_status(
&self,
last_success_batch: &TransactionContext<()>,
) -> Result<(), ProcessorError>;

Este método es donde debería escribirse el checkpointing. Si estás escribiendo a Postgres, puedes usar la implementación Postgres del SDK aquí. Es posible hacer checkpoint del progreso de diferentes maneras usando enums. La implementación Postgres del SDK inserta usando un modelo simple processor_status.

Ahora que el procesador escribe exitosamente al almacén elegido para seguimiento de versión, al reiniciar necesita recuperar la última versión exitosa de ese almacén. Aquí hay un ejemplo de un método get_starting_version() que devuelve la última versión procesada guardada. Esta starting_version: u64 puede entonces usarse como abajo. Si no hay checkpoint, el procesador comenzará desde el inicio de la cadena.

let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;

El SDK no proporciona una implementación de ProcessorStatusSaver que guardará progreso de backfill. Para habilitar guardar progreso de backfill, IndexerProcessorConfig, ProcessorStatusSaver y get_starting_version() necesitan algunas actualizaciones. Sin estos cambios, es difícil ejecutar un procesador en vivo en la última versión de transacción así como un procesador de backfill.

Agregar un campo adicional en tu IndexerProcessorConfig para un BackfillConfig. En esta implementación, el BackfillConfig es parte de un enum ProcessorMode que se usa para determinar el modo en que el procesador está ejecutándose. En modo backfill, el procesador comienza desde una versión diferente y el progreso se guarda en una tabla separada.

Agrega la sección backfill_config a server_config en tu archivo yaml para establecer backfill_alias. Ejemplo

Usa una tabla separada para el estado del procesador de backfill para evitar conflictos de escritura. Esta tabla (backfill_processor_status_table) usa backfill_alias como la clave primaria en lugar de processor_name para prevenir conflictos con la tabla principal processor_status al ejecutar procesadores head y backfill concurrentemente. Crea múltiples procesadores de backfill con backfill_alias diferentes y rangos de versión de transacción para un backfill más rápido. Expande en esta implementación. Este modelo introduce un nuevo estado, BackfillStatus, que es ya sea InProgress o Complete que determinará el comportamiento de reinicio de backfilling.

Expande tu implementación ProcessorStatusSaver para incluir una variante Backfill que extrae el backfill_alias del BackfillConfig, y el backfill_start_version backfill_end_version de IndexerProcessorConfig.transaction_stream_config como esto. Actualiza la consulta de escritura correspondiente para escribir a la nueva tabla backfill_processor_status.

Agrega una declaración en tu método get_starting_version para consultar la backfill_processor_status_table cuando el campo BackfillConfig está presente en IndexerProcessorConfig.