Quickstart Guide on Aptos Indexer SDK
What to expect from this guide
This guide will walk you through setting up and running a Rust processor to index events on the Aptos blockchain into PostgreSQL. We provide a template processor that you can customize to index events from your custom contracts. By the end of the guide, you should have a basic understanding of how a processor works and be able to customize the processor for your indexing needs.
Get started
To get started, clone the aptos-indexer-processor-sdk repo.
# HTTPS
https://github.com/aptos-labs/aptos-indexer-processor-sdk.git
# SSH
git@github.com:aptos-labs/aptos-indexer-processor-sdk.git
Processors consume transactions from the Transaction Stream Service. In order to use the Labs-Hosted Transaction Stream
Service you need an authorization token.
Follow this guide
to guide to get a token from the Developer Portal. Create an API Key for Testnet
, as this tutorial is for Testnet
.
Once you’re done, you should have a token that looks like this:
aptoslabs_yj4bocpaKy_Q6RBP4cdBmjA8T51hto1GcVX5ZS9S65dx
You also need the following tools:
- Rust 1.79: Installation Guide
- Cargo: Installation Guide
We use PostgreSQL as our database and Diesel as our ORM in this tutorial. You’re free to use whatever you want, but this tutorial is geared towards PostgreSQL for the sake of simplicity. We use the following database configuration and tools:
PostgreSQL Installation (for macOS)
brew install libpq
(this is a postgres C API library). Also perform all export commands post-installation
export PATH="/opt/homebrew/opt/libpq/bin:$PATH"
export LDFLAGS="-L/opt/homebrew/opt/libpq/lib"
export CPPFLAGS="-I/opt/homebrew/opt/libpq/include"
brew install postgres
pg_ctl -D /opt/homebrew/var/postgres start
orbrew services start postgresql
/opt/homebrew/bin/createuser -s postgres
- Ensure you’re able to do:
psql postgres
cargo install diesel_cli --no-default-features --features postgres
- Make sure that you’re in the DB folder (run
cd src/db/postgres
from base directory), rundiesel migration run --database-url postgresql://localhost/postgres
a. If for some reason this database is already being used, try a different db. e.g.DATABASE_URL=postgres://postgres@localhost:5432/indexer_v2 diesel database reset
- We will use a database hosted on
localhost
on the port5432
, which should be the default. - When you create your username, keep track of it and the password you use for it.
- To easily view your database data, consider using a GUI like DBeaver recommended, pgAdmin, or Postico.
Set up your environment
Make sure to start the postgresql
service:
The command for Linux/WSL might be something like:
sudo service postgresql start
For mac, if you’re using brew, start it up with:
brew services start postgresql
Configure your processor
Now let’s set up the configuration details for the actual indexer processor we’re going to use.
Set up your config.yaml file
In the example folder, there is a sample config.yaml file that should look something like this:
# This is a template yaml for the processor
health_check_port: 8085
server_config:
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.mainnet.aptoslabs.com:443"
auth_token: "AUTH_TOKEN"
request_name_header: "events-processor"
starting_version: 0
postgres_config:
connection_string: postgresql://postgres:@localhost:5432/example
Open the config.yaml
file and update these fields:
auth_token
- the auth token you got from the Developer Portalpostgres_connection_string
- connection string to your PostgreSQL database
More customization with config.yaml
You can customize additional configuration with the config.yaml
file.
To start at a specific ledger version, you can specify the version in the config.yaml
file with:
starting_version: <Starting Version>
To stop processing at a specific ledger version, you can specify the ending version with:
request_ending_version: <Ending Version>
If you want to use a different network, change the indexer_grpc_data_service_address
field to the corresponding
desired value:
# Devnet
indexer_grpc_data_service_address: grpc.devnet.aptoslabs.com:443
# Testnet
indexer_grpc_data_service_address: grpc.testnet.aptoslabs.com:443
# Mainnet
indexer_grpc_data_service_address: grpc.mainnet.aptoslabs.com:443
In this tutorial, we are using testnet
so update the indexer_grpc_data_service_address
to grpc.testnet.aptoslabs.com:443
.
Create the events processor
At a high level, each processor is responsible for receiving a stream of transactions, parsing and transforming the relevant data, and storing the data into a database.
Define the database schema
In src/db/migrations
, you will see the events migration, which defines the database schema that will be used to store the events.
CREATE TABLE events (
sequence_number BIGINT NOT NULL,
creation_number BIGINT NOT NULL,
account_address VARCHAR(66) NOT NULL,
transaction_version BIGINT NOT NULL,
transaction_block_height BIGINT NOT NULL,
type TEXT NOT NULL,
data JSONB NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
event_index BIGINT NOT NULL,
indexed_type VARCHAR(300) NOT NULL,
PRIMARY KEY (transaction_version, event_index)
);
When you apply migrations, diesel will re-generate the schema.rs
file, which looks like this:
diesel::table! {
events (transaction_version, event_index) {
sequence_number -> Int8,
creation_number -> Int8,
#[max_length = 66]
account_address -> Varchar,
transaction_version -> Int8,
transaction_block_height -> Int8,
#[sql_name = "type"]
type_ -> Text,
data -> Jsonb,
inserted_at -> Timestamp,
event_index -> Int8,
#[max_length = 300]
indexed_type -> Varchar,
}
}
In schema.rs
, you’ll see two other important tables:
ledger_infos
which tracks the chain id of the ledger being indexedprocessor_status
which tracks thelast_success_version
of the processor
Define the processing logic
The file src/main.rs
contains the code which defines the events processor. The key components are:
-
insert_events_query
defines the diesel query to insert events into the database.fn insert_events_query( items_to_insert: Vec<EventModel>, ) -> impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send { use crate::schema::events::dsl::*; diesel::insert_into(crate::schema::events::table) .values(items_to_insert) .on_conflict((transaction_version, event_index)) .do_nothing() }
-
process
is a helper function that wraps around a regular processor. In the background, this powerful function handles connecting to Transaction Stream, processing transactions given a transform function that you define, applying database migrations, and tracking the processor’s status.process( "events_processor".to_string(), // name of the processor that will be used to track the processor status MIGRATIONS, // migrations to be applied to the database async |transactions, conn_pool| { // transform from transaction to events and insert the events into the database }, ).await?;
Run the processor
With the config.yaml
you created earlier, you’re ready to run the events processor:
cd examples/postgres-basic-events-example
cargo run --release -- -c config.yaml
You should see the processor start to index Aptos blockchain events!
{"timestamp":"2024-08-15T01:06:35.169217Z","level":"INFO","message":"[Transaction Stream] Received transactions from GRPC.","stream_address":"https://grpc.testnet.aptoslabs.com/","connection_id":"5575cb8c-61fb-498f-aaae-868d1e8773ac","start_version":0,"end_version":4999,"start_txn_timestamp_iso":"1970-01-01T00:00:00.000000000Z","end_txn_timestamp_iso":"2022-09-09T01:49:02.023089000Z","num_of_transactions":5000,"size_in_bytes":5708539,"duration_in_secs":0.310734,"tps":16078,"bytes_per_sec":18371143.80788713,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e1e1bdd/rust/transaction-stream/src/transaction_stream.rs","line_number":400,"threadName":"tokio-runtime-worker","threadId":"ThreadId(6)"}
{"timestamp":"2024-08-15T01:06:35.257756Z","level":"INFO","message":"Events version [0, 4999] stored successfully","filename":"src/processors/events/events_storer.rs","line_number":75,"threadName":"tokio-runtime-worker","threadId":"ThreadId(10)"}
{"timestamp":"2024-08-15T01:06:35.257801Z","level":"INFO","message":"Finished processing events from versions [0, 4999]","filename":"src/processors/events/events_processor.rs","line_number":90,"threadName":"tokio-runtime-worker","threadId":"ThreadId(17)"}
Customize the processor
In most cases, you want to index events from your own contracts. The example processor offers a good starting point to creating your own custom processor.
To customize the processor to index events from your custom contract, you can make these changes:
- Change the database schema to a format that better matches your dapp or API. a. Create a new migration with diesel:
diesel migration generate {migration_name}
b. Add your migration changes to up.sql
and down.sql
, then apply the migration:
diesel migration run --database-url={YOUR_DATABASE_URL}
c. The schema.rs
file will be updated automatically. You can then create a diesel query that uses the new schema.
2. Update the transform logic in process()
. You can filter by specific event types and extract specific event data from your custom contract
Migrate from legacy processors
If you’re migrating from the legacy processors, you can still start with the same steps above to create a new processor with the Indexer SDK.
You’ll also need to follow these:
- Copy your migration files to
src/db/
. - With the legacy processors, the processing logic is defined inside the
process_transactions
method.
// Example with the legacy processors
#[async_trait]
impl ProcessorTrait for EventsProcessor {
async fn process_transactions(
...
) -> anyhow::Result<ProcessingResult> {
// Extract events from transactions
let events: Vec<EventModel> = process_events(transactions);
// Store the events in the database
let tx_result = insert_to_db(
self.get_pool(),
self.name(),
start_version,
end_version,
&events,
&self.per_table_chunk_sizes,
)
.await;
return tx_result;
}
}
Migrate to the SDK by copying over the logic in process_transactions
method to the SDK process
transform function.
// Example with SDK processor
process(
"events_processor".to_string(),
MIGRATIONS,
async |transactions, conn_pool| {
// Extract events from transactions
let events: Vec<EventModel> = process_events(transactions);
// Store events in the database
let execute_res = execute_in_chunks(
conn_pool.clone(),
insert_events_query,
&events,
MAX_DIESEL_PARAM_SIZE / EventModel::field_count(),
)
.await;
},
)
.await?;
- Update the
config.yaml
file to the new format. Updatestarting_version
to the version that is last saved in theprocessor_status
table.