Skip to content
🎉 Welcome to the new Aptos Docs! Click here to submit feedback!

Creating a Step

What is a step?

A step is a unit of processing logic in the SDK and can be used to define logic for the extraction, transformation, or storing of data. Steps are the building blocks of a processor.

There are two types of steps in the SDK:

  1. AsyncStep: Processes a batch of input items and returns a batch of output items.
  2. PollableAsyncStep: Does the same as AsyncStep, but it also periodically polls its internal state and returns a batch of output items if available.

How to create a Step

To create a step with the SDK, follow these instructions:

  1. Implement the Processable trait. This trait defines several important details about the step: the input and output types, the processing logic, and the run type (either AsyncStepRunType or PollableAsyncStepRunType).

    #[async_trait]
    impl Processable for MyExtractorStep {
        // The Input is a batch of Transaction 
        type Input = Transaction;
        // The Output is a batch of MyData
        type Output = MyData;
     
        // Depending on the type of step this is, the RunType is either
        // - AsyncRunType
        // - PollableAsyncRunType
        type RunType = AsyncRunType;
     
    	// Processes a batch of input items and returns a batch of output items.
        async fn process(
            &mut self,
            input: TransactionContext<Transaction>,
        ) -> Result<Option<TransactionContext<MyData>>, ProcessorError> {
            let transactions = input.data;
            let data = transactions.iter().map(|transaction| {
                // Define the processing logic to extract MyData from a Transaction
            }).collect();
            
            Ok(Some(TransactionContext {
            data,
            metadata: input.metadata,
            }))
        }
    }

    In the example code above, you’ll notice that the input and output types are wrapped within a TransactionContext. TransactionContext contains relevant metadata about the batch of data being processed, such as the transaction versions and timestamp, and are used for metrics and logging.

  2. Implement the NamedStep trait. This is used for logging.

    impl NamedStep for MyExtractorStep {
        fn name(&self) -> String {
            "MyExtractorStep".to_string()
        }
    }
  3. Implement either AsyncStep trait or PollableAsyncStep trait, which defines how the step will be run in the processor.

    1. If you’re using AsyncStep, add this to your code:

      impl AsyncStep for MyExtractorStep {}
    2. If you’re creating a PollableAsyncStep, you will need to define the poll interval and what the step should do every time it polls.

      #[async_trait]
      impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T>
      where
          Self: Sized + Send + Sync + 'static,
          T: Send + 'static,
      {
          fn poll_interval(&self) -> std::time::Duration {
              // Define duration
          }
       
          async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> {
              // Define code here on what this step should do every time it polls
              // Optionally return a batch of output items
          }
      }

How to connect steps

Now that you have created a step, you can connect it to other steps in the processor. ProcessorBuilder is used to connect a graph of steps to construct a processor. It uses trait bounds to ensure that the output type of each step matches the input type of its connected step.

How to use ProcessorBuilder

  1. Initialize the processor with the first step using ProcessorBuilder::new_with_inputless_first_step.
  2. Connect the next step using .connect_to(second_step.into_runnable_step(), channel_size).
  • .into_runnable_step() converts your step into a RunnableStep, which enables it to store the step’s input and output channels and allows the step to be spawned in a task.
  • When you use .connect_to, a channel gets created with size channel_size and connected to the previous and current steps, and the previous step is spawned in a task.
  1. To close off the ProcessorBuilder, use .end_and_return_output_receiver(channel_size). This returns a channel receiver that can be used to receive the final output of the processor.

Example

let (pb, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
      first_step.into_runnable_step(),
  )
  .connect_to(second_step.into_runnable_step(), channel_size)
  .connect_to(third_step.into_runnable_step(), channel_size)
  .end_and_return_output_receiver(channel_size);

Common steps

The SDK provides several common steps that you can use in your processor.

  1. TransactionStreamStep provides a stream of Aptos transactions to the processor
  2. TimedBufferStep buffers a batch of items and periodically polls to release the items to the next step