Crear un Paso
¿Qué es un paso?
Sección titulada «¿Qué es un paso?»Un paso es una unidad de lógica de procesamiento en el SDK y puede usarse para definir lógica para la extracción, transformación o almacenamiento de datos. Los pasos son los bloques de construcción de un procesador. Los procesadores centrales de Aptos representan (1) obtener un flujo de transacciones de Transaction Stream, (2) extraer los datos, (3) escribir a una base de datos, y (4) rastrear el progreso, cada uno como pasos separados.
Hay dos tipos de pasos en el SDK:
- AsyncStep: Procesa un lote de elementos de entrada y devuelve un lote de elementos de salida.
- PollableAsyncStep: Hace lo mismo que
AsyncStep
, pero también sondea periódicamente su estado interno y devuelve un lote de elementos de salida si están disponibles.
Cómo crear un Paso
Sección titulada «Cómo crear un Paso»Para crear un paso con el SDK, sigue estas instrucciones:
-
Implementa el trait
Processable
. Este trait define varios detalles importantes sobre el paso: los tipos de entrada y salida, la lógica de procesamiento, y el tipo de ejecución (ya seaAsyncStepRunType
oPollableAsyncStepRunType
).#[async_trait]impl Processable for MyExtractorStep {// La Entrada es un vec de Transactiontype Input = Vec<Transaction>;// La Salida es un vec de MyDatatype Output = Vec<MyData>;// Dependiendo del tipo de paso que sea, el RunType es ya sea// - AsyncRunType// - PollableAsyncRunTypetype RunType = AsyncRunType;// Procesa un lote de elementos de entrada y devuelve un lote de elementos de salida.async fn process(&mut self,input: TransactionContext<Vec<Transaction>>,) -> Result<Option<TransactionContext<Vec<MyData>>>, ProcessorError> {let transactions = input.data;let data = transactions.iter().map(|transaction| {// Define la lógica de procesamiento para extraer MyData de una Transaction}).collect();Ok(Some(TransactionContext {data,metadata: input.metadata,}))}}En el código de ejemplo arriba, notarás que los tipos de entrada y salida están envueltos dentro de un
TransactionContext
.TransactionContext
contiene metadatos relevantes sobre el lote de datos siendo procesados, como las versiones de transacción y timestamp, y se usan para métricas y logging. -
Implementa el trait
NamedStep
. Esto se usa para logging.impl NamedStep for MyExtractorStep {fn name(&self) -> String {"MyExtractorStep".to_string()}} -
Implementa ya sea el trait
AsyncStep
o el traitPollableAsyncStep
, que define cómo el paso será ejecutado en el procesador.-
Si estás usando
AsyncStep
, agrega esto a tu código:impl AsyncStep for MyExtractorStep {} -
Si estás creando un
PollableAsyncStep
, necesitarás definir el intervalo de sondeo y qué debería hacer el paso cada vez que sondea.#[async_trait]impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T>whereSelf: Sized + Send + Sync + 'static,T: Send + 'static,{fn poll_interval(&self) -> std::time::Duration {// Define duración}async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> {// Define código aquí sobre qué debería hacer este paso cada vez que sondea// Opcionalmente devuelve un lote de elementos de salida}}
-
Parsear Transacciones
Sección titulada «Parsear Transacciones»Al construir el paso extractor, necesitarás definir cómo quieres parsear tus datos de las transacciones. Lee más sobre cómo parsear tus datos de transacciones aquí.
Pasos comunes del SDK
Sección titulada «Pasos comunes del SDK»El SDK viene con un conjunto de pasos comunes que puedes usar para construir tu procesador.
TransactionStreamStep
proporciona un flujo de transacciones de Aptos al procesador. Lee más sobre esto aquí.TimedBufferStep
almacena en buffer un lote de elementos y sondea periódicamente para liberar los elementos al siguiente pasoVersionTrackerStep
rastrea el progreso del procesador y hace checkpoints del progreso del procesador. Lee más sobre esto aquí.OrderByVersionStep
ordena contextos de transacción por sus versiones de inicio. Almacena en buffer estos contextos ordenados y los libera en cada intervalo de sondeo.WriteRateLimitStep
limita el número de bytes escritos a la base de datos por segundo.