Seguimiento de Versión
Seguimiento de Versión
Sección titulada «Seguimiento de Versión»El VersionTrackerStep
es un paso común en el SDK método como otros pasos. Tras un lote procesado exitosamente, el VersionTrackerStep
llamará la implementación del trait de save_processor_status()
.
ProcessorStatusSaver
Sección titulada «ProcessorStatusSaver»El trait ProcessorStatusSaver
requiere la implementación del método save_processor_status
con la siguiente firma:
async fn save_processor_status( &self, last_success_batch: &TransactionContext<()>, ) -> Result<(), ProcessorError>;
Este método es donde debería escribirse el checkpointing.
Si estás escribiendo a Postgres, puedes usar la implementación Postgres del SDK aquí.
Es posible hacer checkpoint del progreso de diferentes maneras usando enums.
La implementación Postgres del SDK inserta usando un modelo simple processor_status
.
Comportamiento de Reinicio
Sección titulada «Comportamiento de Reinicio»Ahora que el procesador escribe exitosamente al almacén elegido para seguimiento de versión, al reiniciar necesita recuperar la última versión exitosa de ese almacén.
Aquí hay un ejemplo de un método get_starting_version()
que devuelve la última versión procesada guardada.
Esta starting_version: u64
puede entonces usarse como abajo.
Si no hay checkpoint, el procesador comenzará desde el inicio de la cadena.
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), ..self.config.transaction_stream_config.clone() }) .await?;
Backfilling
Sección titulada «Backfilling»El SDK no proporciona una implementación de ProcessorStatusSaver
que guardará progreso de backfill.
Para habilitar guardar progreso de backfill, IndexerProcessorConfig
, ProcessorStatusSaver
y get_starting_version()
necesitan algunas actualizaciones.
Sin estos cambios, es difícil ejecutar un procesador en vivo en la última versión de transacción así como un procesador de backfill.
Actualizaciones a Config
Sección titulada «Actualizaciones a Config»Agregar un campo adicional en tu IndexerProcessorConfig
para un BackfillConfig
.
En esta implementación, el BackfillConfig
es parte de un enum ProcessorMode
que se usa para determinar el modo en que el procesador está ejecutándose.
En modo backfill, el procesador comienza desde una versión diferente y el progreso se guarda en una tabla separada.
Actualizaciones a config.yaml
Sección titulada «Actualizaciones a config.yaml»Agrega la sección backfill_config
a server_config
en tu archivo yaml para establecer backfill_alias
. Ejemplo
Tabla de Estado de Procesador de Backfill
Sección titulada «Tabla de Estado de Procesador de Backfill»Usa una tabla separada para el estado del procesador de backfill para evitar conflictos de escritura. Esta tabla (backfill_processor_status_table
) usa backfill_alias
como la clave primaria en lugar de processor_name
para prevenir conflictos con la tabla principal processor_status
al ejecutar procesadores head y backfill concurrentemente.
Crea múltiples procesadores de backfill con backfill_alias
diferentes y rangos de versión de transacción para un backfill más rápido.
Expande en esta implementación.
Este modelo introduce un nuevo estado, BackfillStatus
, que es ya sea InProgress
o Complete
que determinará el comportamiento de reinicio de backfilling.
Actualizaciones a ProcessorStatusSaver
Sección titulada «Actualizaciones a ProcessorStatusSaver»Expande tu implementación ProcessorStatusSaver
para incluir una variante Backfill
que extrae el backfill_alias
del BackfillConfig
, y el backfill_start_version
backfill_end_version
de IndexerProcessorConfig.transaction_stream_config
como esto.
Actualiza la consulta de escritura correspondiente para escribir a la nueva tabla backfill_processor_status
.
Actualizaciones a get_starting_version
Sección titulada «Actualizaciones a get_starting_version»Agrega una declaración en tu método get_starting_version
para consultar la backfill_processor_status_table
cuando el campo BackfillConfig
está presente en IndexerProcessorConfig
.