Creating a Step
What is a step?
A step is a unit of processing logic in the SDK and can be used to define logic for the extraction, transformation, or storing of data. Steps are the building blocks of a processor.
There are two types of steps in the SDK:
- AsyncStep: Processes a batch of input items and returns a batch of output items.
- PollableAsyncStep: Does the same as
AsyncStep
, but it also periodically polls its internal state and returns a batch of output items if available.
How to create a Step
To create a step with the SDK, follow these instructions:
-
Implement the
Processable
trait. This trait defines several important details about the step: the input and output types, the processing logic, and the run type (eitherAsyncStepRunType
orPollableAsyncStepRunType
).#[async_trait] impl Processable for MyExtractorStep { // The Input is a batch of Transaction type Input = Transaction; // The Output is a batch of MyData type Output = MyData; // Depending on the type of step this is, the RunType is either // - AsyncRunType // - PollableAsyncRunType type RunType = AsyncRunType; // Processes a batch of input items and returns a batch of output items. async fn process( &mut self, input: TransactionContext<Transaction>, ) -> Result<Option<TransactionContext<MyData>>, ProcessorError> { let transactions = input.data; let data = transactions.iter().map(|transaction| { // Define the processing logic to extract MyData from a Transaction }).collect(); Ok(Some(TransactionContext { data, metadata: input.metadata, })) } }
In the example code above, you’ll notice that the input and output types are wrapped within a
TransactionContext
.TransactionContext
contains relevant metadata about the batch of data being processed, such as the transaction versions and timestamp, and are used for metrics and logging. -
Implement the
NamedStep
trait. This is used for logging.impl NamedStep for MyExtractorStep { fn name(&self) -> String { "MyExtractorStep".to_string() } }
-
Implement either
AsyncStep
trait orPollableAsyncStep
trait, which defines how the step will be run in the processor.-
If you’re using
AsyncStep
, add this to your code:impl AsyncStep for MyExtractorStep {}
-
If you’re creating a
PollableAsyncStep
, you will need to define the poll interval and what the step should do every time it polls.#[async_trait] impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T> where Self: Sized + Send + Sync + 'static, T: Send + 'static, { fn poll_interval(&self) -> std::time::Duration { // Define duration } async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> { // Define code here on what this step should do every time it polls // Optionally return a batch of output items } }
-
How to connect steps
Now that you have created a step, you can connect it to other steps in the processor.
ProcessorBuilder
is used to connect a graph of steps to construct a processor.
It uses trait bounds to ensure that the output type of each step matches the input type of its connected step.
How to use ProcessorBuilder
- Initialize the processor with the first step using
ProcessorBuilder::new_with_inputless_first_step
. - Connect the next step using
.connect_to(second_step.into_runnable_step(), channel_size)
.
.into_runnable_step()
converts your step into aRunnableStep
, which enables it to store the step’s input and output channels and allows the step to be spawned in a task.- When you use
.connect_to
, a channel gets created with sizechannel_size
and connected to the previous and current steps, and the previous step is spawned in a task.
- To close off the
ProcessorBuilder
, use.end_and_return_output_receiver(channel_size)
. This returns a channel receiver that can be used to receive the final output of the processor.
Example
let (pb, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
first_step.into_runnable_step(),
)
.connect_to(second_step.into_runnable_step(), channel_size)
.connect_to(third_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
Common steps
The SDK provides several common steps that you can use in your processor.
TransactionStreamStep
provides a stream of Aptos transactions to the processorTimedBufferStep
buffers a batch of items and periodically polls to release the items to the next step