Migrar al SDK de Indexer
Esta guía contiene instrucciones sobre cómo migrar tu procesador personalizado legacy (que está escrito de la forma antigua) al SDK de Indexer.
1. Clonar el repositorio de ejemplo
Sección titulada «1. Clonar el repositorio de ejemplo»Usamos el procesador de eventos de ejemplo en aptos-indexer-processor-example
como punto de partida para la migración.
git clone https://github.com/aptos-labs/aptos-indexer-processor-example.git
2. Migrar la configuración de tu procesador
Sección titulada «2. Migrar la configuración de tu procesador»Anteriormente, crearías una rama de aptos-indexer-processors
y actualizarías la configuración del procesador para incluir tu procesador personalizado.
Este enfoque legacy hacía muy difícil actualizar tu procesador.
Para abordar esto, el SDK ya no depende de aptos-indexer-processors
.
Como resultado, necesitarás definir tus propios structs IndexerProcessorConfig
y ProcessorConfig
.
El IndexerProcessorConfig
define la configuración base para todos los procesadores que estarás ejecutando.
El ProcessorConfig
es un enum que contiene todas las configuraciones de procesadores individuales.
Actualiza los siguientes archivos en tu proyecto:
ProcessorConfig
: ReemplazaEventsProcessor
con tu procesador.IndexerProcessorConfig
: Actualiza el método.run()
para incluir tu procesador.
Si te gustaría leer más sobre configuración en el SDK, echa un vistazo a la guía Crear un Procesador.
3. Migrar lógica de procesamiento a pasos
Sección titulada «3. Migrar lógica de procesamiento a pasos»En la forma antigua, definías la lógica de tu procesador implementando el método process_transactions
de ProcessorTrait
.
Procesador de eventos de ejemplo escrito con la forma antigua:
#[async_trait]impl ProcessorTrait for EventsProcessor { async fn process_transactions( ... ) -> anyhow::Result<ProcessingResult> { // Extraer eventos de transacciones let events: Vec<EventModel> = process_events(transactions);
// Almacenar los eventos en la base de datos let tx_result = insert_to_db( self.get_pool(), self.name(), start_version, end_version, &events, &self.per_table_chunk_sizes, ) .await;
return tx_result; }}
async fn insert_to_db( conn: ArcDbPool, name: &'static str, start_version: u64, end_version: u64, events: &[EventModel], per_table_chunk_sizes: &AHashMap<String, usize>,) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, start_version = start_version, end_version = end_version, "Inserting to db", ); execute_in_chunks( conn, insert_events_query, events, get_config_table_chunk_size::<EventModel>("events", per_table_chunk_sizes), ) .await?; Ok(())}
Con el SDK, hemos introducido el concepto de pasos, que representan unidades independientes de lógica de procesamiento.
En el ejemplo EventsProcessor
, la extracción de eventos y almacenarlos en la base de datos puede dividirse en dos pasos.
Para migrar tu procesador al SDK, necesitarás definir estos pasos en tu procesador.
Puedes usar los pasos EventsExtractor
y EventsStorer
en el ejemplo como punto de partida para definir tus propios pasos.
Haz los siguientes cambios a events_extractor.rs
.
// TODO: Actualizar el nombre del pasopub struct EventsExtractorwhere Self: Sized + Send + 'static, {}
#[async_trait]impl Processable for EventsExtractor { type Input = Vec<Transaction>; // TODO: Actualizar el tipo de salida // Esto debería ser el modelo de datos que estás extrayendo de las transacciones type Output = Vec<EventModel>; type RunType = AsyncRunType;
async fn process( &mut self, item: TransactionContext<Vec<Transaction>>, ) -> Result<Option<TransactionContext<Vec<EventModel>>>, ProcessorError> { // TODO: Actualizar lógica de extracción. // Esto debería ser la misma lógica de extracción en el método `process_transactions` anterior let events = item .data .par_iter() .map(|txn| { process_events(txn) }) .flatten() .collect::<Vec<EventModel>>();
Ok(Some(TransactionContext { data: events, metadata: item.metadata, })) }}
Haz los siguientes cambios a events_storer.rs
.
pub struct EventsStorerwhere Self: Sized + Send + 'static,{ conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig,}
impl EventsStorer { pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self { Self { conn_pool, processor_config, } }}
#[async_trait]// TODO: Actualizar nombre del pasoimpl Processable for EventsStorer { // TODO: Actualizar tipo de entrada para el paso. // El tipo de entrada debería coincidir con el tipo de salida del paso extractor. type Input = Vec<EventModel>; type Output = (); type RunType = AsyncRunType;
async fn process( &mut self, events: TransactionContext<Vec<EventModel>>, ) -> Result<Option<TransactionContext<()>>, ProcessorError> { let per_table_chunk_sizes: AHashMap<String, usize> = AHashMap::new(); let execute_res = execute_in_chunks( self.conn_pool.clone(), // TODO: Actualizar esto a la consulta de inserción de tu procesador anterior insert_events_query, &events.data, get_config_table_chunk_size::<EventModel>("events", &per_table_chunk_sizes), ) .await; match execute_res { Ok(_) => { Ok(Some(TransactionContext { data: (), metadata: events.metadata, })) }, Err(e) => Err(ProcessorError::DBStoreError { message: format!( "Failed to store events versions {} to {}: {:?}", events.metadata.start_version, events.metadata.end_version, e, ), query: None, }), } }}
impl AsyncStep for EventsStorer {}
impl NamedStep for EventsStorer { fn name(&self) -> String { "EventsStorer".to_string() }}
4. Migrar tu procesador
Sección titulada «4. Migrar tu procesador»Ahora que hemos migrado la lógica de procesamiento a pasos, también necesitamos migrar el procesador para instanciar los pasos y conectarlos juntos.
En events_processor.rs
, haz los siguientes cambios:
// TODO: Actualizar nombre del procesadorpub struct EventsProcessor { pub config: IndexerProcessorConfig, pub db_pool: ArcDbPool, // Si tienes otros campos en tu procesador, agrégalos aquí // Puedes instanciarlos en consecuencia en el método `new` del procesador}
En el método run_processor
, necesitarás actualizar el código para usar los pasos que creaste en Paso 3.
pub async fn run_processor(self) -> Result<()> { {...}
// Definir pasos del procesador let transaction_stream_config = self.config.transaction_stream_config.clone(); let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), ..transaction_stream_config }) .await?; // TODO: Reemplazar las siguientes 2 líneas con tus pasos let events_extractor = EventsExtractor {}; let events_storer = EventsStorer::new(self.db_pool.clone());
let version_tracker = VersionTrackerStep::new( get_processor_status_saver(self.db_pool.clone(), self.config.clone()), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, );
// Conectar pasos del procesador juntos let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) // TODO: Reemplazar las siguientes 2 líneas con tus pasos .connect_to(events_extractor.into_runnable_step(), 10) .connect_to(events_storer.into_runnable_step(), 10) .connect_to(version_tracker.into_runnable_step(), 10) .end_and_return_output_receiver(10);
{...}}
5. Actualizar tu config.yaml
Sección titulada «5. Actualizar tu config.yaml»IndexerProcessorConfig
reelabora el formato del archivo config.yaml
.
Usa el config.yaml
de ejemplo.
health_check_port: 8085server_config: processor_config: # TODO: Actualizar con tipo de procesador type: "events_processor" transaction_stream_config: indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" # TODO: Actualizar token de auth auth_token: "AUTH_TOKEN" # TODO: Actualizar con nombre del procesador request_name_header: "events-processor" db_config: # TODO: Actualizar con tu string de conexión de base de datos postgres_connection_string: postgresql://postgres:@localhost:5432/example # backfill_config: # backfill_alias: "events_processor_backfill_1"
6. Ejecutar tu procesador migrado
Sección titulada «6. Ejecutar tu procesador migrado»cd ~/{DIRECTORY_OF_PROJECT}/aptos-indexer-processor-examplecargo run --release -- -c config.yaml
En tu terminal, deberías empezar a ver logs como este:
{"timestamp":"2025-01-13T21:23:21.785452Z","level":"INFO","message":"[Transaction Stream] Successfully connected to GRPC stream","stream_address":"https://grpc.mainnet.aptoslabs.com/","connection_id":"ec67ecc4-e041-4f17-a2e2-441e7ff21487","start_version":2186504987,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/transaction-stream/src/transaction_stream.rs","line_number":349,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}{"timestamp":"2025-01-13T21:23:21.785664Z","level":"INFO","message":"Spawning polling task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}{"timestamp":"2025-01-13T21:23:21.785693Z","level":"INFO","message":"Spawning processing task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}{"timestamp":"2025-01-13T21:23:21.785710Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetExtractor","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}{"timestamp":"2025-01-13T21:23:21.785912Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetStorer","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}{"timestamp":"2025-01-13T21:23:21.785978Z","level":"INFO","message":"Spawning polling task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}{"timestamp":"2025-01-13T21:23:21.786018Z","level":"INFO","message":"Spawning processing task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}
7. Backfilling con el SDK
Sección titulada «7. Backfilling con el SDK»Con el SDK, hemos hecho algunas mejoras al proceso de backfilling. Hay dos opciones en backfilling:
- Puedes seguir siguiendo la forma antigua de backfilling, que es ejecutar una segunda instancia del procesador y actualizar
starting_version
a la versión de backfill. - El SDK introduce una mejora donde puedes rastrear progreso de un backfill e iniciar y parar el backfill según sea necesario.
Si te gustaría usar el nuevo proceso de backfilling, actualiza tu
config.yaml
así:
health_check_port: 8085server_config: processor_config: # TODO: Actualizar con tipo de procesador type: "events_processor" transaction_stream_config: indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" # TODO: Actualizar con versión de backfill starting_version: {backfill version} # TODO: Actualizar token de auth auth_token: "AUTH_TOKEN" # TODO: Actualizar con nombre del procesador request_name_header: "events-processor" db_config: # TODO: Actualizar con tu string de conexión de base de datos postgres_connection_string: postgresql://postgres:@localhost:5432/example backfill_config: # TODO: Actualizar con tu alias de backfill. Esto debería ser único para cada backfill backfill_alias: "events_processor_backfill_1"