Saltearse al contenido

Migrar al SDK de Indexer

Esta guía contiene instrucciones sobre cómo migrar tu procesador personalizado legacy (que está escrito de la forma antigua) al SDK de Indexer.

Usamos el procesador de eventos de ejemplo en aptos-indexer-processor-example como punto de partida para la migración.

Ventana de terminal
git clone https://github.com/aptos-labs/aptos-indexer-processor-example.git

2. Migrar la configuración de tu procesador

Sección titulada «2. Migrar la configuración de tu procesador»

Anteriormente, crearías una rama de aptos-indexer-processors y actualizarías la configuración del procesador para incluir tu procesador personalizado. Este enfoque legacy hacía muy difícil actualizar tu procesador. Para abordar esto, el SDK ya no depende de aptos-indexer-processors. Como resultado, necesitarás definir tus propios structs IndexerProcessorConfig y ProcessorConfig.

El IndexerProcessorConfig define la configuración base para todos los procesadores que estarás ejecutando. El ProcessorConfig es un enum que contiene todas las configuraciones de procesadores individuales.

Actualiza los siguientes archivos en tu proyecto:

Si te gustaría leer más sobre configuración en el SDK, echa un vistazo a la guía Crear un Procesador.

En la forma antigua, definías la lógica de tu procesador implementando el método process_transactions de ProcessorTrait.

Procesador de eventos de ejemplo escrito con la forma antigua:

#[async_trait]
impl ProcessorTrait for EventsProcessor {
async fn process_transactions(
...
) -> anyhow::Result<ProcessingResult> {
// Extraer eventos de transacciones
let events: Vec<EventModel> = process_events(transactions);
// Almacenar los eventos en la base de datos
let tx_result = insert_to_db(
self.get_pool(),
self.name(),
start_version,
end_version,
&events,
&self.per_table_chunk_sizes,
)
.await;
return tx_result;
}
}
async fn insert_to_db(
conn: ArcDbPool,
name: &'static str,
start_version: u64,
end_version: u64,
events: &[EventModel],
per_table_chunk_sizes: &AHashMap<String, usize>,
) -> Result<(), diesel::result::Error> {
tracing::trace!(
name = name,
start_version = start_version,
end_version = end_version,
"Inserting to db",
);
execute_in_chunks(
conn,
insert_events_query,
events,
get_config_table_chunk_size::<EventModel>("events", per_table_chunk_sizes),
)
.await?;
Ok(())
}

Con el SDK, hemos introducido el concepto de pasos, que representan unidades independientes de lógica de procesamiento. En el ejemplo EventsProcessor, la extracción de eventos y almacenarlos en la base de datos puede dividirse en dos pasos.

Para migrar tu procesador al SDK, necesitarás definir estos pasos en tu procesador. Puedes usar los pasos EventsExtractor y EventsStorer en el ejemplo como punto de partida para definir tus propios pasos.

Haz los siguientes cambios a events_extractor.rs.

// TODO: Actualizar el nombre del paso
pub struct EventsExtractor
where
Self: Sized + Send + 'static, {}
#[async_trait]
impl Processable for EventsExtractor {
type Input = Vec<Transaction>;
// TODO: Actualizar el tipo de salida
// Esto debería ser el modelo de datos que estás extrayendo de las transacciones
type Output = Vec<EventModel>;
type RunType = AsyncRunType;
async fn process(
&mut self,
item: TransactionContext<Vec<Transaction>>,
) -> Result<Option<TransactionContext<Vec<EventModel>>>, ProcessorError> {
// TODO: Actualizar lógica de extracción.
// Esto debería ser la misma lógica de extracción en el método `process_transactions` anterior
let events = item
.data
.par_iter()
.map(|txn| {
process_events(txn)
})
.flatten()
.collect::<Vec<EventModel>>();
Ok(Some(TransactionContext {
data: events,
metadata: item.metadata,
}))
}
}

Haz los siguientes cambios a events_storer.rs.

pub struct EventsStorer
where
Self: Sized + Send + 'static,
{
conn_pool: ArcDbPool,
processor_config: DefaultProcessorConfig,
}
impl EventsStorer {
pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self {
Self {
conn_pool,
processor_config,
}
}
}
#[async_trait]
// TODO: Actualizar nombre del paso
impl Processable for EventsStorer {
// TODO: Actualizar tipo de entrada para el paso.
// El tipo de entrada debería coincidir con el tipo de salida del paso extractor.
type Input = Vec<EventModel>;
type Output = ();
type RunType = AsyncRunType;
async fn process(
&mut self,
events: TransactionContext<Vec<EventModel>>,
) -> Result<Option<TransactionContext<()>>, ProcessorError> {
let per_table_chunk_sizes: AHashMap<String, usize> = AHashMap::new();
let execute_res = execute_in_chunks(
self.conn_pool.clone(),
// TODO: Actualizar esto a la consulta de inserción de tu procesador anterior
insert_events_query,
&events.data,
get_config_table_chunk_size::<EventModel>("events", &per_table_chunk_sizes),
)
.await;
match execute_res {
Ok(_) => {
Ok(Some(TransactionContext {
data: (),
metadata: events.metadata,
}))
},
Err(e) => Err(ProcessorError::DBStoreError {
message: format!(
"Failed to store events versions {} to {}: {:?}",
events.metadata.start_version, events.metadata.end_version, e,
),
query: None,
}),
}
}
}
impl AsyncStep for EventsStorer {}
impl NamedStep for EventsStorer {
fn name(&self) -> String {
"EventsStorer".to_string()
}
}

Ahora que hemos migrado la lógica de procesamiento a pasos, también necesitamos migrar el procesador para instanciar los pasos y conectarlos juntos. En events_processor.rs, haz los siguientes cambios:

// TODO: Actualizar nombre del procesador
pub struct EventsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
// Si tienes otros campos en tu procesador, agrégalos aquí
// Puedes instanciarlos en consecuencia en el método `new` del procesador
}

En el método run_processor, necesitarás actualizar el código para usar los pasos que creaste en Paso 3.

pub async fn run_processor(self) -> Result<()> {
{...}
// Definir pasos del procesador
let transaction_stream_config = self.config.transaction_stream_config.clone();
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..transaction_stream_config
})
.await?;
// TODO: Reemplazar las siguientes 2 líneas con tus pasos
let events_extractor = EventsExtractor {};
let events_storer = EventsStorer::new(self.db_pool.clone());
let version_tracker = VersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);
// Conectar pasos del procesador juntos
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
// TODO: Reemplazar las siguientes 2 líneas con tus pasos
.connect_to(events_extractor.into_runnable_step(), 10)
.connect_to(events_storer.into_runnable_step(), 10)
.connect_to(version_tracker.into_runnable_step(), 10)
.end_and_return_output_receiver(10);
{...}
}

IndexerProcessorConfig reelabora el formato del archivo config.yaml. Usa el config.yaml de ejemplo.

health_check_port: 8085
server_config:
processor_config:
# TODO: Actualizar con tipo de procesador
type: "events_processor"
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
# TODO: Actualizar token de auth
auth_token: "AUTH_TOKEN"
# TODO: Actualizar con nombre del procesador
request_name_header: "events-processor"
db_config:
# TODO: Actualizar con tu string de conexión de base de datos
postgres_connection_string: postgresql://postgres:@localhost:5432/example
# backfill_config:
# backfill_alias: "events_processor_backfill_1"
Ventana de terminal
cd ~/{DIRECTORY_OF_PROJECT}/aptos-indexer-processor-example
cargo run --release -- -c config.yaml

En tu terminal, deberías empezar a ver logs como este:

Ventana de terminal
{"timestamp":"2025-01-13T21:23:21.785452Z","level":"INFO","message":"[Transaction Stream] Successfully connected to GRPC stream","stream_address":"https://grpc.mainnet.aptoslabs.com/","connection_id":"ec67ecc4-e041-4f17-a2e2-441e7ff21487","start_version":2186504987,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/transaction-stream/src/transaction_stream.rs","line_number":349,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785664Z","level":"INFO","message":"Spawning polling task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}
{"timestamp":"2025-01-13T21:23:21.785693Z","level":"INFO","message":"Spawning processing task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}
{"timestamp":"2025-01-13T21:23:21.785710Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetExtractor","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785912Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetStorer","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785978Z","level":"INFO","message":"Spawning polling task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}
{"timestamp":"2025-01-13T21:23:21.786018Z","level":"INFO","message":"Spawning processing task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}

Con el SDK, hemos hecho algunas mejoras al proceso de backfilling. Hay dos opciones en backfilling:

  1. Puedes seguir siguiendo la forma antigua de backfilling, que es ejecutar una segunda instancia del procesador y actualizar starting_version a la versión de backfill.
  2. El SDK introduce una mejora donde puedes rastrear progreso de un backfill e iniciar y parar el backfill según sea necesario. Si te gustaría usar el nuevo proceso de backfilling, actualiza tu config.yaml así:
health_check_port: 8085
server_config:
processor_config:
# TODO: Actualizar con tipo de procesador
type: "events_processor"
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
# TODO: Actualizar con versión de backfill
starting_version: {backfill version}
# TODO: Actualizar token de auth
auth_token: "AUTH_TOKEN"
# TODO: Actualizar con nombre del procesador
request_name_header: "events-processor"
db_config:
# TODO: Actualizar con tu string de conexión de base de datos
postgres_connection_string: postgresql://postgres:@localhost:5432/example
backfill_config:
# TODO: Actualizar con tu alias de backfill. Esto debería ser único para cada backfill
backfill_alias: "events_processor_backfill_1"