Migrate to Indexer SDK
This guide contains instructions on how to migrate your legacy custom processor (that’s written in the old way) to Indexer SDK.
1. Clone the example repo
We use example events processor in aptos-indexer-processor-example
as a starting point for the migration.
git clone https://github.com/aptos-labs/aptos-indexer-processor-example.git
2. Migrate your processor config
Previously, you would create a branch of aptos-indexer-processors
and update the processor config to include your custom processor.
This legacy approach made it very difficult to upgrade your processor.
To address this, the SDK no longer depends on aptos-indexer-processors
.
As a result, you’ll need to define your own IndexerProcessorConfig
and ProcessorConfig
structs.
The IndexerProcessorConfig
defines the base configuration for all processors that you’ll be running.
The ProcessorConfig
is an enum that contains all the individual processor configs.
Update the following files in your project:
ProcessorConfig
: ReplaceEventsProcessor
with your processor.IndexerProcessorConfig
: Update the.run()
method to include your processor.
If you’d like to read more about configuration in the SDK, take a look at the Create a Processor guide.
3. Migrate processing logic to steps
In the old way, you defined your processor’s logic by implementing ProcessorTrait
’s process_transactions
method.
Example events processor written with the old way:
#[async_trait]
impl ProcessorTrait for EventsProcessor {
async fn process_transactions(
...
) -> anyhow::Result<ProcessingResult> {
// Extract events from transactions
let events: Vec<EventModel> = process_events(transactions);
// Store the events in the database
let tx_result = insert_to_db(
self.get_pool(),
self.name(),
start_version,
end_version,
&events,
&self.per_table_chunk_sizes,
)
.await;
return tx_result;
}
}
async fn insert_to_db(
conn: ArcDbPool,
name: &'static str,
start_version: u64,
end_version: u64,
events: &[EventModel],
per_table_chunk_sizes: &AHashMap<String, usize>,
) -> Result<(), diesel::result::Error> {
tracing::trace!(
name = name,
start_version = start_version,
end_version = end_version,
"Inserting to db",
);
execute_in_chunks(
conn,
insert_events_query,
events,
get_config_table_chunk_size::<EventModel>("events", per_table_chunk_sizes),
)
.await?;
Ok(())
}
With the SDK, we’ve introduced the concept of steps, which represent independent units of processing logic.
In the EventsProcessor
example, the extraction of events and storing them in the database can be broken down into two steps.
To migrate your processor to the SDK, you’ll need to define these steps in your processor.
You can use the EventsExtractor
and EventsStorer
steps in the example as a starting point for defining your own steps.
Make the following changes to events_extractor.rs
.
// TODO: Update the step name
pub struct EventsExtractor
where
Self: Sized + Send + 'static, {}
#[async_trait]
impl Processable for EventsExtractor {
type Input = Vec<Transaction>;
// TODO: Update the output type
// This should be the data model you're extracting from the transactions
type Output = Vec<EventModel>;
type RunType = AsyncRunType;
async fn process(
&mut self,
item: TransactionContext<Vec<Transaction>>,
) -> Result<Option<TransactionContext<Vec<EventModel>>>, ProcessorError> {
// TODO: Update extraction logic.
// This should be the same as the extraction logic in the old `process_transactions` method
let events = item
.data
.par_iter()
.map(|txn| {
process_events(txn)
})
.flatten()
.collect::<Vec<EventModel>>();
Ok(Some(TransactionContext {
data: events,
metadata: item.metadata,
}))
}
}
Make the following changes to events_storer.rs
.
pub struct EventsStorer
where
Self: Sized + Send + 'static,
{
conn_pool: ArcDbPool,
processor_config: DefaultProcessorConfig,
}
impl EventsStorer {
pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self {
Self {
conn_pool,
processor_config,
}
}
}
#[async_trait]
// TODO: Update step name
impl Processable for EventsStorer {
// TODO: Update input type for the step.
// The input type should match the output type of the extractor step.
type Input = Vec<EventModel>;
type Output = ();
type RunType = AsyncRunType;
async fn process(
&mut self,
events: TransactionContext<Vec<EventModel>>,
) -> Result<Option<TransactionContext<()>>, ProcessorError> {
let per_table_chunk_sizes: AHashMap<String, usize> = AHashMap::new();
let execute_res = execute_in_chunks(
self.conn_pool.clone(),
// TODO: Update this to the insertion query of your old processor
insert_events_query,
&events.data,
get_config_table_chunk_size::<EventModel>("events", &per_table_chunk_sizes),
)
.await;
match execute_res {
Ok(_) => {
Ok(Some(TransactionContext {
data: (),
metadata: events.metadata,
}))
},
Err(e) => Err(ProcessorError::DBStoreError {
message: format!(
"Failed to store events versions {} to {}: {:?}",
events.metadata.start_version, events.metadata.end_version, e,
),
query: None,
}),
}
}
}
impl AsyncStep for EventsStorer {}
impl NamedStep for EventsStorer {
fn name(&self) -> String {
"EventsStorer".to_string()
}
}
4. Migrate your processor
Now that we’ve migrated the processing logic to steps, we need to also migrate the processor to instantiate the steps and connect them together.
In events_processor.rs
, make the following changes:
// TODO: Update processor name
pub struct EventsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
// If you have any other fields in your processor, add them here
// You can instantiate them accordingly in the processor's `new` method
}
In the run_processor
method, you’ll need to update the code to use the steps you created in Step 3.
pub async fn run_processor(self) -> Result<()> {
{...}
// Define processor steps
let transaction_stream_config = self.config.transaction_stream_config.clone();
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..transaction_stream_config
})
.await?;
// TODO: Replace the next 2 lines with your steps
let events_extractor = EventsExtractor {};
let events_storer = EventsStorer::new(self.db_pool.clone());
let version_tracker = VersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);
// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
// TODO: Replace the next 2 lines with your steps
.connect_to(events_extractor.into_runnable_step(), 10)
.connect_to(events_storer.into_runnable_step(), 10)
.connect_to(version_tracker.into_runnable_step(), 10)
.end_and_return_output_receiver(10);
{...}
}
5. Update your config.yaml
IndexerProcessorConfig
reworks the format of the config.yaml
file.
Use the example config.yaml
.
health_check_port: 8085
server_config:
processor_config:
# TODO: Update with processor type
type: "events_processor"
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
# TODO: Update auth token
auth_token: "AUTH_TOKEN"
# TODO: Update with processor name
request_name_header: "events-processor"
db_config:
# TODO: Update with your database connection string
postgres_connection_string: postgresql://postgres:@localhost:5432/example
# backfill_config:
# backfill_alias: "events_processor_backfill_1"
6. Run your migrated processor
cd ~/{DIRECTORY_OF_PROJECT}/aptos-indexer-processor-example
cargo run --release -- -c config.yaml
In your terminal, you should start to see logs like this:
{"timestamp":"2025-01-13T21:23:21.785452Z","level":"INFO","message":"[Transaction Stream] Successfully connected to GRPC stream","stream_address":"https://grpc.mainnet.aptoslabs.com/","connection_id":"ec67ecc4-e041-4f17-a2e2-441e7ff21487","start_version":2186504987,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/transaction-stream/src/transaction_stream.rs","line_number":349,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785664Z","level":"INFO","message":"Spawning polling task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}
{"timestamp":"2025-01-13T21:23:21.785693Z","level":"INFO","message":"Spawning processing task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}
{"timestamp":"2025-01-13T21:23:21.785710Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetExtractor","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785912Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetStorer","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785978Z","level":"INFO","message":"Spawning polling task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}
{"timestamp":"2025-01-13T21:23:21.786018Z","level":"INFO","message":"Spawning processing task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}
7. Backfilling with the SDK
With the SDK, we’ve made some improvements to the backfilling process. There are two options on backfilling:
- You can keep following the old way of backfilling, which is to run a second instance of the processor and updating
starting_version
to the backfill version. - The SDK introduces an improvement where you can track progress of a backfill and start and stop the backfill as needed.
If you’d like to use the new backfilling process, update your
config.yaml
like so:
health_check_port: 8085
server_config:
processor_config:
# TODO: Update with processor type
type: "events_processor"
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
# TODO: Update with backfill version
starting_version: {backfill version}
# TODO: Update auth token
auth_token: "AUTH_TOKEN"
# TODO: Update with processor name
request_name_header: "events-processor"
db_config:
# TODO: Update with your database connection string
postgres_connection_string: postgresql://postgres:@localhost:5432/example
backfill_config:
# TODO: Update with your backfill alias. This should be unique for each backfill
backfill_alias: "events_processor_backfill_1"