Skip to content
🎉 Welcome to the new Aptos Docs! Click here to submit feedback!

Creating a Processor

This guide will walk you through setting up the basic template for a new processor.

Pre-requisites

You’ve already set up your environment and have the Indexer SDK aptos-indexer-sdk and aptos-indexer-sdk-server-framework installed. If you haven’t, follow the Indexer SDK installation guide.

Overview

Creating and running a processor will require several pieces:

  1. IndexerProcessorConfig
  2. ProcessorConfig
  3. The processor itself. This is where you’ll define a processor’s config, the processor setup, and the steps that will be run to index transactions.
  4. main.rs - The main file that will run the processor.

The next section goes through each of these pieces more explicitly and provides code examples.

How to define IndexerProcessorConfig

The IndexerProcessorConfig defines the base configuration for all processors that you’ll be running. It should include configuration for things that are shared across multiple processors, like the database configuration and Transaction Stream configuration.

ServerArgs parses a config.yaml file and bootstraps a server with all the common pieces to run a processor.

To setup the configuration for your processor and make it work with ServerArgs, you’ll need to define a IndexerProcessorConfig that implements the RunnableConfig trait. It also triggers a run method, which can be invoked in main.rs.

For basic cases, you can copy the IndexerProcessorConfig from the aptos-indexer-processor-example repository and modify it to fit your needs.

How to define ProcessorConfig

ProcessorConfig is an enum that contains all the individual processor configs. It’s used by IndexerProcessorConfig.run() to map the processor name to the right ProcessorConfig.

You can see a basic example of a ProcessorConfig here. An example of a more complex setup that includes multiple processors and configurations is aptos-indexer-processors.

How to create a processor

Now that you’ve got the configuration pieces set up, the next step is to create the processor. The processor is represented by a struct and is usually named {PROCESSOR_NAME}Processor, like EventsProcessor or TokenV2Processor, depending on the type of data it’s indexing.

pub struct EventsProcessor {
    pub config: IndexerProcessorConfig,
    pub db_pool: ArcDbPool,
}

The processor’s constructor should be defined like so:

pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
    // Processor setup code here, if needed
}

It takes in the IndexerProcessorConfig that you’ve defined and performs any setup required to instantiate the processor. Next, your processor needs to implement the ProcessorTrait.

#[async_trait::async_trait]
impl ProcessorTrait for EventsProcessor {
    fn name(&self) -> &'static str {
        self.config.processor_config.name()
    }
 
    async fn run_processor(&self) -> Result<()> {
        // Processor logic here
    }
}

The run_processor method is the most important method in the processor.

If you’re using a migration-based database, like PostgreSQL, running the migrations can go inside of run_processor. This is also where we implement logic to determine the appropriate starting version for the processor, verify the chain ID using Transaction Stream, and validate the processor’s configuration.

run_processor also contains the instantiation of the processor’s Steps and the specification of how these Steps are connected together by channels.

// Instantiate processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
    starting_version: Some(starting_version),
    ..self.config.transaction_stream_config.clone()
})
.await?;
// ... Instantiate the rest of your processor's steps ...
 
// Connect processor steps
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
    transaction_stream.into_runnable_step(),
)
.connect_to(extractor_step.into_runnable_step(), channel_size)
.connect_to(storer_step.into_runnable_step(), channel_size)
.connect_to(version_tracker_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
 
// Read the results from the output of the last step
loop {
    match buffer_receiver.recv().await {
        // Do something with th output
    }
}

You can see a full example of a processor that indexes raw Aptos events in aptos-indexer-processor-example. As a reference, you can also see all of the processors that make up the Indexer API in aptos-indexer-processors.

How to define main.rs

You may copy the main.rs file from the aptos-indexer-processor-example.

These lines of code uses the ServerArgs and the IndexerProcessorConfig that we’ve defined earlier:

let args = ServerArgs::parse();
args.run::<IndexerProcessorConfig>(tokio::runtime::Handle::current())
    .await