Connecting Steps
Pre-requisite
At this point, you’d have already followed the Creating a Processor and Creating a Step guides. Our next goal is to put those two pieces together and connect steps within the processor.
How to connect steps
Now that you have created a step, you can connect it to other steps.
To do so, we use a builder class called ProcessorBuilder
to specify a sequence of steps that make up a processor.
- After you’ve instantiated your steps, you need to convert them into
RunnableStep
.RunnableStep
is a trait that wraps around a step. It provides the necessary input and output channels that feed into the step and allows the step to be spawned in a task. The SDK provides a helper function.into_runnable_step
to convert a step into aRunnableStep
. - Setup your first step with
ProcessorBuilder::new_with_inputless_first_step
. In almost all cases, the first step should be aTransactionStreamStep
. - Connect the previous step to the next step using
.connect_to
.connect_to
uses trait bounds to ensure at compile time that the output type of the previous step matches the input type of the next step. When calling.connect_to
, a channel gets created with sizechannel_size
and connects the previous and next steps. It also spawns a task that continuously loops the previous step — reading data from its input channel, processing the data, and sending the output to its output channel. - Repeat step 3 for each step in your processor.
- To close off the
ProcessorBuilder
, use.end_and_return_output_receiver
. This returns anInstrumentedAsyncReceiver
which you can use to process the output of the last step in the graph.
Here’s a simple example of connecting two steps:
let (processor_builder, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream_step.into_runnable_step(),
)
.connect_to(extractor_step.into_runnable_step(), 10)
.end_and_return_output_receiver(10);
Here’s a full example from aptos-indexer-processor-example
.
Visualizing the processor
As you connect steps, ProcessorBuilder
in the background is constructing a graphical representation of the steps in your processor using petgraph
.
You can see the visual representation of the graph by calling
let dot = processor_builder.graph.dot();
println!("{}", dot);
This will output a graph in the DOT language that you can visualize using tools like Graphviz.