Saltearse al contenido

Crear un Procesador

Esta guía te llevará a través de configurar la plantilla básica para un nuevo procesador.

Ya has configurado tu entorno y tienes el SDK de Indexer aptos-indexer-sdk instalado. Si no, sigue la guía de instalación del SDK de Indexer.

Crear y ejecutar un procesador requerirá varias piezas:

  1. IndexerProcessorConfig
  2. ProcessorConfig
  3. El procesador mismo. Aquí es donde definirás la configuración de un procesador, la configuración del procesador, y los pasos que se ejecutarán para indexar transacciones.
  4. main.rs - El archivo principal que ejecutará el procesador.

La siguiente sección repasa cada una de estas piezas más explícitamente y proporciona ejemplos de código.

El IndexerProcessorConfig define la configuración base para todos los procesadores que estarás ejecutando. Debería incluir configuración para cosas que se comparten a través de múltiples procesadores, como la configuración de base de datos y configuración de Transaction Stream.

ServerArgs parsea un archivo config.yaml y bootstrapea un servidor con todas las piezas comunes para ejecutar un procesador.

Para configurar la configuración para tu procesador y hacerlo funcionar con ServerArgs, necesitarás definir un IndexerProcessorConfig que implemente el trait RunnableConfig. También activa un método run, que puede ser invocado en main.rs.

Para casos básicos, puedes copiar el IndexerProcessorConfig del repositorio aptos-indexer-processor-example y modificarlo para ajustarse a tus necesidades.

ProcessorConfig es un enum que contiene todas las configuraciones de procesadores individuales. Es usado por IndexerProcessorConfig.run() para mapear el nombre del procesador al ProcessorConfig correcto.

Puedes ver un ejemplo básico de un ProcessorConfig aquí. Un ejemplo de una configuración más compleja que incluye múltiples procesadores y configuraciones es aptos-indexer-processors.

Ahora que tienes las piezas de configuración configuradas, el siguiente paso es crear el procesador. El procesador está representado por una struct y usualmente se nombra {PROCESSOR_NAME}Processor, como EventsProcessor o TokenV2Processor, dependiendo del tipo de datos que está indexando.

pub struct EventsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}

El constructor del procesador debería definirse así:

pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
// Código de configuración del procesador aquí, si es necesario
}

Toma el IndexerProcessorConfig que has definido y realiza cualquier configuración requerida para instanciar el procesador. A continuación, tu procesador necesita implementar el ProcessorTrait.

#[async_trait::async_trait]
impl ProcessorTrait for EventsProcessor {
fn name(&self) -> &'static str {
self.config.processor_config.name()
}
async fn run_processor(&self) -> Result<()> {
// Lógica del procesador aquí
}
}

El método run_processor es el método más importante en el procesador.

Si estás usando una base de datos basada en migración, como PostgreSQL, ejecutar las migraciones puede ir dentro de run_processor. Aquí también implementamos lógica para determinar la versión de inicio apropiada para el procesador, verificar el ID de cadena usando Transaction Stream, y validar la configuración del procesador.

run_processor también contiene la instanciación de los Steps del procesador y la especificación de cómo estos Steps están conectados juntos por canales.

// Instanciar pasos del procesador
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
// ... Instanciar el resto de los pasos de tu procesador ...
// Conectar pasos del procesador
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(extractor_step.into_runnable_step(), channel_size)
.connect_to(storer_step.into_runnable_step(), channel_size)
.connect_to(version_tracker_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
// Leer los resultados de la salida del último paso
loop {
match buffer_receiver.recv().await {
// Hacer algo con la salida
}
}

Puedes ver un ejemplo completo de un procesador que indexa eventos crudos de Aptos en aptos-indexer-processor-example. Como referencia, también puedes ver todos los procesadores que componen la API de Indexer en aptos-indexer-processors.

Puedes copiar el archivo main.rs del aptos-indexer-processor-example.

Estas líneas de código usan los ServerArgs y el IndexerProcessorConfig que hemos definido anteriormente:

let args = ServerArgs::parse();
args.run::<IndexerProcessorConfig>(tokio::runtime::Handle::current())
.await