Crear un Procesador
Esta guía te llevará a través de configurar la plantilla básica para un nuevo procesador.
Prerrequisitos
Sección titulada «Prerrequisitos»Ya has configurado tu entorno y tienes el SDK de Indexer aptos-indexer-sdk
instalado.
Si no, sigue la guía de instalación del SDK de Indexer.
Resumen
Sección titulada «Resumen»Crear y ejecutar un procesador requerirá varias piezas:
IndexerProcessorConfig
ProcessorConfig
- El procesador mismo. Aquí es donde definirás la configuración de un procesador, la configuración del procesador, y los pasos que se ejecutarán para indexar transacciones.
main.rs
- El archivo principal que ejecutará el procesador.
La siguiente sección repasa cada una de estas piezas más explícitamente y proporciona ejemplos de código.
Cómo definir IndexerProcessorConfig
Sección titulada «Cómo definir IndexerProcessorConfig»El IndexerProcessorConfig
define la configuración base para todos los procesadores que estarás ejecutando.
Debería incluir configuración para cosas que se comparten a través de múltiples procesadores, como la configuración de base de datos y configuración de Transaction Stream.
ServerArgs
parsea un archivo config.yaml
y bootstrapea un servidor con todas las piezas comunes para ejecutar un procesador.
Para configurar la configuración para tu procesador y hacerlo funcionar con ServerArgs
, necesitarás definir un IndexerProcessorConfig
que implemente el trait RunnableConfig
.
También activa un método run, que puede ser invocado en main.rs
.
Para casos básicos, puedes copiar el IndexerProcessorConfig
del repositorio aptos-indexer-processor-example
y modificarlo para ajustarse a tus necesidades.
Cómo definir ProcessorConfig
Sección titulada «Cómo definir ProcessorConfig»ProcessorConfig
es un enum que contiene todas las configuraciones de procesadores individuales.
Es usado por IndexerProcessorConfig.run()
para mapear el nombre del procesador al ProcessorConfig
correcto.
Puedes ver un ejemplo básico de un ProcessorConfig
aquí.
Un ejemplo de una configuración más compleja que incluye múltiples procesadores y configuraciones es aptos-indexer-processors
.
Cómo crear un procesador
Sección titulada «Cómo crear un procesador»Ahora que tienes las piezas de configuración configuradas, el siguiente paso es crear el procesador.
El procesador está representado por una struct y usualmente se nombra {PROCESSOR_NAME}Processor
, como EventsProcessor
o TokenV2Processor
, dependiendo del tipo de datos que está indexando.
pub struct EventsProcessor { pub config: IndexerProcessorConfig, pub db_pool: ArcDbPool,}
El constructor del procesador debería definirse así:
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> { // Código de configuración del procesador aquí, si es necesario}
Toma el IndexerProcessorConfig
que has definido y realiza cualquier configuración requerida para instanciar el procesador.
A continuación, tu procesador necesita implementar el ProcessorTrait
.
#[async_trait::async_trait]impl ProcessorTrait for EventsProcessor { fn name(&self) -> &'static str { self.config.processor_config.name() }
async fn run_processor(&self) -> Result<()> { // Lógica del procesador aquí }}
El método run_processor
es el método más importante en el procesador.
Si estás usando una base de datos basada en migración, como PostgreSQL, ejecutar las migraciones puede ir dentro de run_processor
.
Aquí también implementamos lógica para determinar la versión de inicio apropiada para el procesador, verificar el ID de cadena usando Transaction Stream, y validar la configuración del procesador.
run_processor
también contiene la instanciación de los Step
s del procesador y la especificación de cómo estos Step
s están conectados juntos por canales.
// Instanciar pasos del procesadorlet transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), ..self.config.transaction_stream_config.clone()}).await?;// ... Instanciar el resto de los pasos de tu procesador ...
// Conectar pasos del procesadorlet (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(),).connect_to(extractor_step.into_runnable_step(), channel_size).connect_to(storer_step.into_runnable_step(), channel_size).connect_to(version_tracker_step.into_runnable_step(), channel_size).end_and_return_output_receiver(channel_size);
// Leer los resultados de la salida del último pasoloop { match buffer_receiver.recv().await { // Hacer algo con la salida }}
Puedes ver un ejemplo completo de un procesador que indexa eventos crudos de Aptos en aptos-indexer-processor-example
.
Como referencia, también puedes ver todos los procesadores que componen la API de Indexer en aptos-indexer-processors
.
Cómo definir main.rs
Sección titulada «Cómo definir main.rs»Puedes copiar el archivo main.rs
del aptos-indexer-processor-example
.
Estas líneas de código usan los ServerArgs
y el IndexerProcessorConfig
que hemos definido anteriormente:
let args = ServerArgs::parse();args.run::<IndexerProcessorConfig>(tokio::runtime::Handle::current()) .await