Saltearse al contenido

Crear un Paso

Un paso es una unidad de lógica de procesamiento en el SDK y puede usarse para definir lógica para la extracción, transformación o almacenamiento de datos. Los pasos son los bloques de construcción de un procesador. Los procesadores centrales de Aptos representan (1) obtener un flujo de transacciones de Transaction Stream, (2) extraer los datos, (3) escribir a una base de datos, y (4) rastrear el progreso, cada uno como pasos separados.

Hay dos tipos de pasos en el SDK:

  1. AsyncStep: Procesa un lote de elementos de entrada y devuelve un lote de elementos de salida.
  2. PollableAsyncStep: Hace lo mismo que AsyncStep, pero también sondea periódicamente su estado interno y devuelve un lote de elementos de salida si están disponibles.

Para crear un paso con el SDK, sigue estas instrucciones:

  1. Implementa el trait Processable. Este trait define varios detalles importantes sobre el paso: los tipos de entrada y salida, la lógica de procesamiento, y el tipo de ejecución (ya sea AsyncStepRunType o PollableAsyncStepRunType).

    #[async_trait]
    impl Processable for MyExtractorStep {
    // La Entrada es un vec de Transaction
    type Input = Vec<Transaction>;
    // La Salida es un vec de MyData
    type Output = Vec<MyData>;
    // Dependiendo del tipo de paso que sea, el RunType es ya sea
    // - AsyncRunType
    // - PollableAsyncRunType
    type RunType = AsyncRunType;
    // Procesa un lote de elementos de entrada y devuelve un lote de elementos de salida.
    async fn process(
    &mut self,
    input: TransactionContext<Vec<Transaction>>,
    ) -> Result<Option<TransactionContext<Vec<MyData>>>, ProcessorError> {
    let transactions = input.data;
    let data = transactions.iter().map(|transaction| {
    // Define la lógica de procesamiento para extraer MyData de una Transaction
    }).collect();
    Ok(Some(TransactionContext {
    data,
    metadata: input.metadata,
    }))
    }
    }

    En el código de ejemplo arriba, notarás que los tipos de entrada y salida están envueltos dentro de un TransactionContext. TransactionContext contiene metadatos relevantes sobre el lote de datos siendo procesados, como las versiones de transacción y timestamp, y se usan para métricas y logging.

  2. Implementa el trait NamedStep. Esto se usa para logging.

    impl NamedStep for MyExtractorStep {
    fn name(&self) -> String {
    "MyExtractorStep".to_string()
    }
    }
  3. Implementa ya sea el trait AsyncStep o el trait PollableAsyncStep, que define cómo el paso será ejecutado en el procesador.

    1. Si estás usando AsyncStep, agrega esto a tu código:

      impl AsyncStep for MyExtractorStep {}
    2. Si estás creando un PollableAsyncStep, necesitarás definir el intervalo de sondeo y qué debería hacer el paso cada vez que sondea.

      #[async_trait]
      impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T>
      where
      Self: Sized + Send + Sync + 'static,
      T: Send + 'static,
      {
      fn poll_interval(&self) -> std::time::Duration {
      // Define duración
      }
      async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> {
      // Define código aquí sobre qué debería hacer este paso cada vez que sondea
      // Opcionalmente devuelve un lote de elementos de salida
      }
      }

Al construir el paso extractor, necesitarás definir cómo quieres parsear tus datos de las transacciones. Lee más sobre cómo parsear tus datos de transacciones aquí.

El SDK viene con un conjunto de pasos comunes que puedes usar para construir tu procesador.

  1. TransactionStreamStep proporciona un flujo de transacciones de Aptos al procesador. Lee más sobre esto aquí.
  2. TimedBufferStep almacena en buffer un lote de elementos y sondea periódicamente para liberar los elementos al siguiente paso
  3. VersionTrackerStep rastrea el progreso del procesador y hace checkpoints del progreso del procesador. Lee más sobre esto aquí.
  4. OrderByVersionStep ordena contextos de transacción por sus versiones de inicio. Almacena en buffer estos contextos ordenados y los libera en cada intervalo de sondeo.
  5. WriteRateLimitStep limita el número de bytes escritos a la base de datos por segundo.