Creating a Processor
This guide will walk you through setting up the basic template for a new processor.
Pre-requisites
You’ve already set up your environment and have the Indexer SDK aptos-indexer-sdk
and aptos-indexer-sdk-server-framework
installed.
If you haven’t, follow the Indexer SDK installation guide.
Overview
Creating and running a processor will require several pieces:
IndexerProcessorConfig
ProcessorConfig
- The processor itself. This is where you’ll define a processor’s config, the processor setup, and the steps that will be run to index transactions.
main.rs
- The main file that will run the processor.
The next section goes through each of these pieces more explicitly and provides code examples.
How to define IndexerProcessorConfig
The IndexerProcessorConfig
defines the base configuration for all processors that you’ll be running.
It should include configuration for things that are shared across multiple processors, like the database configuration and Transaction Stream configuration.
ServerArgs
parses a config.yaml
file and bootstraps a server with all the common pieces to run a processor.
To setup the configuration for your processor and make it work with ServerArgs
, you’ll need to define a IndexerProcessorConfig
that implements the RunnableConfig
trait.
It also triggers a run method, which can be invoked in main.rs
.
For basic cases, you can copy the IndexerProcessorConfig
from the aptos-indexer-processor-example
repository and modify it to fit your needs.
How to define ProcessorConfig
ProcessorConfig
is an enum that contains all the individual processor configs.
It’s used by IndexerProcessorConfig.run()
to map the processor name to the right ProcessorConfig
.
You can see a basic example of a ProcessorConfig
here.
An example of a more complex setup that includes multiple processors and configurations is aptos-indexer-processors
.
How to create a processor
Now that you’ve got the configuration pieces set up, the next step is to create the processor.
The processor is represented by a struct and is usually named {PROCESSOR_NAME}Processor
, like EventsProcessor
or TokenV2Processor
, depending on the type of data it’s indexing.
pub struct EventsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}
The processor’s constructor should be defined like so:
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
// Processor setup code here, if needed
}
It takes in the IndexerProcessorConfig
that you’ve defined and performs any setup required to instantiate the processor.
Next, your processor needs to implement the ProcessorTrait
.
#[async_trait::async_trait]
impl ProcessorTrait for EventsProcessor {
fn name(&self) -> &'static str {
self.config.processor_config.name()
}
async fn run_processor(&self) -> Result<()> {
// Processor logic here
}
}
The run_processor
method is the most important method in the processor.
If you’re using a migration-based database, like PostgreSQL, running the migrations can go inside of run_processor
.
This is also where we implement logic to determine the appropriate starting version for the processor, verify the chain ID using Transaction Stream, and validate the processor’s configuration.
run_processor
also contains the instantiation of the processor’s Step
s and the specification of how these Step
s are connected together by channels.
// Instantiate processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
// ... Instantiate the rest of your processor's steps ...
// Connect processor steps
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(extractor_step.into_runnable_step(), channel_size)
.connect_to(storer_step.into_runnable_step(), channel_size)
.connect_to(version_tracker_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
// Read the results from the output of the last step
loop {
match buffer_receiver.recv().await {
// Do something with th output
}
}
You can see a full example of a processor that indexes raw Aptos events in aptos-indexer-processor-example
.
As a reference, you can also see all of the processors that make up the Indexer API in aptos-indexer-processors
.
How to define main.rs
You may copy the main.rs
file from the aptos-indexer-processor-example
.
These lines of code uses the ServerArgs
and the IndexerProcessorConfig
that we’ve defined earlier:
let args = ServerArgs::parse();
args.run::<IndexerProcessorConfig>(tokio::runtime::Handle::current())
.await