Skip to content
🎉 Welcome to the new Aptos Docs! Click here to submit an issue.
BuildIndexerIndexer SDKQuickstart Guide

Quickstart Guide on Aptos Indexer SDK

What to expect from this guide

This guide will walk you through setting up and running a Rust processor to index events on the Aptos blockchain into PostgreSQL. We provide a template processor that you can customize to index events from your custom contracts. By the end of the guide, you should have a basic understanding of how a processor works and be able to customize the processor for your indexing needs.

Get started

To get started, clone the aptos-indexer-processor-sdk repo.

# HTTPS
https://github.com/aptos-labs/aptos-indexer-processor-sdk.git
 
# SSH
git@github.com:aptos-labs/aptos-indexer-processor-sdk.git

Processors consume transactions from the Transaction Stream Service. In order to use the Labs-Hosted Transaction Stream Service you need an authorization token. Follow this guide to guide to get a token from the Developer Portal. Create an API Key for Testnet, as this tutorial is for Testnet. Once you’re done, you should have a token that looks like this:

aptoslabs_yj4bocpaKy_Q6RBP4cdBmjA8T51hto1GcVX5ZS9S65dx

You also need the following tools:

We use PostgreSQL as our database and Diesel as our ORM in this tutorial. You’re free to use whatever you want, but this tutorial is geared towards PostgreSQL for the sake of simplicity. We use the following database configuration and tools:

PostgreSQL Installation (for macOS)

  1. brew install libpq (this is a postgres C API library). Also perform all export commands post-installation
export PATH="/opt/homebrew/opt/libpq/bin:$PATH"
export LDFLAGS="-L/opt/homebrew/opt/libpq/lib"
export CPPFLAGS="-I/opt/homebrew/opt/libpq/include"
  1. brew install postgres
  2. pg_ctl -D /opt/homebrew/var/postgres start or brew services start postgresql
  3. /opt/homebrew/bin/createuser -s postgres
  4. Ensure you’re able to do: psql postgres
  5. cargo install diesel_cli --no-default-features --features postgres
  6. Make sure that you’re in the DB folder (run cd src/db/postgres from base directory), run diesel migration run --database-url postgresql://localhost/postgres a. If for some reason this database is already being used, try a different db. e.g. DATABASE_URL=postgres://postgres@localhost:5432/indexer_v2 diesel database reset
  • We will use a database hosted on localhost on the port 5432, which should be the default.
  • When you create your username, keep track of it and the password you use for it.
  • To easily view your database data, consider using a GUI like DBeaver recommended, pgAdmin, or Postico.

Set up your environment

Make sure to start the postgresql service:

The command for Linux/WSL might be something like:

sudo service postgresql start

For mac, if you’re using brew, start it up with:

brew services start postgresql

Configure your processor

Now let’s set up the configuration details for the actual indexer processor we’re going to use.

Set up your config.yaml file

In the example folder, there is a sample config.yaml file that should look something like this:

# This is a template yaml for the processor
health_check_port: 8085
server_config:
  transaction_stream_config:
    indexer_grpc_data_service_address: "https://grpc.mainnet.aptoslabs.com:443"
    auth_token: "AUTH_TOKEN"
    request_name_header: "events-processor"
    starting_version: 0
  postgres_config:
    connection_string: postgresql://postgres:@localhost:5432/example

Open the config.yaml file and update these fields:

  • auth_token - the auth token you got from the Developer Portal
  • postgres_connection_string - connection string to your PostgreSQL database

More customization with config.yaml

You can customize additional configuration with the config.yaml file.

To start at a specific ledger version, you can specify the version in the config.yaml file with:

starting_version: <Starting Version>

To stop processing at a specific ledger version, you can specify the ending version with:

request_ending_version: <Ending Version>

If you want to use a different network, change the indexer_grpc_data_service_address field to the corresponding desired value:

# Devnet
indexer_grpc_data_service_address: grpc.devnet.aptoslabs.com:443
 
# Testnet
indexer_grpc_data_service_address: grpc.testnet.aptoslabs.com:443
 
# Mainnet
indexer_grpc_data_service_address: grpc.mainnet.aptoslabs.com:443

In this tutorial, we are using testnet so update the indexer_grpc_data_service_address to grpc.testnet.aptoslabs.com:443.

Create the events processor

At a high level, each processor is responsible for receiving a stream of transactions, parsing and transforming the relevant data, and storing the data into a database.

Define the database schema

In src/db/migrations, you will see the events migration, which defines the database schema that will be used to store the events.

CREATE TABLE events (
    sequence_number BIGINT NOT NULL,
    creation_number BIGINT NOT NULL,
    account_address VARCHAR(66) NOT NULL,
    transaction_version BIGINT NOT NULL,
    transaction_block_height BIGINT NOT NULL,
    type TEXT NOT NULL,
    data JSONB NOT NULL,
    inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
    event_index BIGINT NOT NULL,
    indexed_type VARCHAR(300) NOT NULL,
    PRIMARY KEY (transaction_version, event_index)
);

When you apply migrations, diesel will re-generate the schema.rs file, which looks like this:

diesel::table! {
    events (transaction_version, event_index) {
        sequence_number -> Int8,
        creation_number -> Int8,
        #[max_length = 66]
        account_address -> Varchar,
        transaction_version -> Int8,
        transaction_block_height -> Int8,
        #[sql_name = "type"]
        type_ -> Text,
        data -> Jsonb,
        inserted_at -> Timestamp,
        event_index -> Int8,
        #[max_length = 300]
        indexed_type -> Varchar,
    }
}

In schema.rs, you’ll see two other important tables:

  • ledger_infos which tracks the chain id of the ledger being indexed
  • processor_status which tracks the last_success_version of the processor

Define the processing logic

The file src/main.rs contains the code which defines the events processor. The key components are:

  1. insert_events_query defines the diesel query to insert events into the database.

    fn insert_events_query(
        items_to_insert: Vec<EventModel>,
    ) -> impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send {
        use crate::schema::events::dsl::*;
        diesel::insert_into(crate::schema::events::table)
            .values(items_to_insert)
            .on_conflict((transaction_version, event_index))
            .do_nothing()
    }
  2. process is a helper function that wraps around a regular processor. In the background, this powerful function handles connecting to Transaction Stream, processing transactions given a transform function that you define, applying database migrations, and tracking the processor’s status.

    process(
            "events_processor".to_string(), // name of the processor that will be used to track the processor status
            MIGRATIONS, // migrations to be applied to the database
            async |transactions, conn_pool| {
              // transform from transaction to events and insert the events into the database
            },
    ).await?;

Run the processor

With the config.yaml you created earlier, you’re ready to run the events processor:

cd examples/postgres-basic-events-example
cargo run --release -- -c config.yaml

You should see the processor start to index Aptos blockchain events!

{"timestamp":"2024-08-15T01:06:35.169217Z","level":"INFO","message":"[Transaction Stream] Received transactions from GRPC.","stream_address":"https://grpc.testnet.aptoslabs.com/","connection_id":"5575cb8c-61fb-498f-aaae-868d1e8773ac","start_version":0,"end_version":4999,"start_txn_timestamp_iso":"1970-01-01T00:00:00.000000000Z","end_txn_timestamp_iso":"2022-09-09T01:49:02.023089000Z","num_of_transactions":5000,"size_in_bytes":5708539,"duration_in_secs":0.310734,"tps":16078,"bytes_per_sec":18371143.80788713,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e1e1bdd/rust/transaction-stream/src/transaction_stream.rs","line_number":400,"threadName":"tokio-runtime-worker","threadId":"ThreadId(6)"}
{"timestamp":"2024-08-15T01:06:35.257756Z","level":"INFO","message":"Events version [0, 4999] stored successfully","filename":"src/processors/events/events_storer.rs","line_number":75,"threadName":"tokio-runtime-worker","threadId":"ThreadId(10)"}
{"timestamp":"2024-08-15T01:06:35.257801Z","level":"INFO","message":"Finished processing events from versions [0, 4999]","filename":"src/processors/events/events_processor.rs","line_number":90,"threadName":"tokio-runtime-worker","threadId":"ThreadId(17)"}

Customize the processor

In most cases, you want to index events from your own contracts. The example processor offers a good starting point to creating your own custom processor.

To customize the processor to index events from your custom contract, you can make these changes:

  1. Change the database schema to a format that better matches your dapp or API. a. Create a new migration with diesel:
  diesel migration generate {migration_name} 

b. Add your migration changes to up.sql and down.sql, then apply the migration:

  diesel migration run --database-url={YOUR_DATABASE_URL}

c. The schema.rs file will be updated automatically. You can then create a diesel query that uses the new schema. 2. Update the transform logic in process(). You can filter by specific event types and extract specific event data from your custom contract

Migrate from legacy processors

If you’re migrating from the legacy processors, you can still start with the same steps above to create a new processor with the Indexer SDK.

You’ll also need to follow these:

  1. Copy your migration files to src/db/.
  2. With the legacy processors, the processing logic is defined inside the process_transactions method.
// Example with the legacy processors
#[async_trait]
impl ProcessorTrait for EventsProcessor {
    async fn process_transactions(
        ...
    ) -> anyhow::Result<ProcessingResult> {
        // Extract events from transactions 
        let events: Vec<EventModel> = process_events(transactions);
 
        // Store the events in the database
        let tx_result = insert_to_db(
            self.get_pool(),
            self.name(),
            start_version,
            end_version,
            &events,
            &self.per_table_chunk_sizes,
        )
        .await;
 
        return tx_result;
    }
}

Migrate to the SDK by copying over the logic in process_transactions method to the SDK process transform function.

// Example with SDK processor
    process(
        "events_processor".to_string(),
        MIGRATIONS,
        async |transactions, conn_pool| {
          // Extract events from transactions 
          let events: Vec<EventModel> = process_events(transactions);
 
          // Store events in the database
          let execute_res = execute_in_chunks(
              conn_pool.clone(),
              insert_events_query,
              &events,
              MAX_DIESEL_PARAM_SIZE / EventModel::field_count(),
          )
          .await;
        },
    )
    .await?;
  1. Update the config.yaml file to the new format. Update starting_version to the version that is last saved in the processor_status table.