Migrate to Indexer SDK
This guide contains instructions on how to migrate your legacy custom processor (that’s written in the old way) to Indexer SDK.
1. Clone the example repo
Section titled “1. Clone the example repo”We use example events processor in aptos-indexer-processor-example
as a starting point for the migration.
git clone https://github.com/aptos-labs/aptos-indexer-processor-example.git
2. Migrate your processor config
Section titled “2. Migrate your processor config”Previously, you would create a branch of aptos-indexer-processors
and update the processor config to include your custom processor.
This legacy approach made it very difficult to upgrade your processor.
To address this, the SDK no longer depends on aptos-indexer-processors
.
As a result, you’ll need to define your own IndexerProcessorConfig
and ProcessorConfig
structs.
The IndexerProcessorConfig
defines the base configuration for all processors that you’ll be running.
The ProcessorConfig
is an enum that contains all the individual processor configs.
Update the following files in your project:
ProcessorConfig
: ReplaceEventsProcessor
with your processor.IndexerProcessorConfig
: Update the.run()
method to include your processor.
If you’d like to read more about configuration in the SDK, take a look at the Create a Processor guide.
3. Migrate processing logic to steps
Section titled “3. Migrate processing logic to steps”In the old way, you defined your processor’s logic by implementing ProcessorTrait
’s process_transactions
method.
Example events processor written with the old way:
#[async_trait]impl ProcessorTrait for EventsProcessor { async fn process_transactions( ... ) -> anyhow::Result<ProcessingResult> { // Extract events from transactions let events: Vec<EventModel> = process_events(transactions);
// Store the events in the database let tx_result = insert_to_db( self.get_pool(), self.name(), start_version, end_version, &events, &self.per_table_chunk_sizes, ) .await;
return tx_result; }}
async fn insert_to_db( conn: ArcDbPool, name: &'static str, start_version: u64, end_version: u64, events: &[EventModel], per_table_chunk_sizes: &AHashMap<String, usize>,) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, start_version = start_version, end_version = end_version, "Inserting to db", ); execute_in_chunks( conn, insert_events_query, events, get_config_table_chunk_size::<EventModel>("events", per_table_chunk_sizes), ) .await?; Ok(())}
With the SDK, we’ve introduced the concept of steps, which represent independent units of processing logic.
In the EventsProcessor
example, the extraction of events and storing them in the database can be broken down into two steps.
To migrate your processor to the SDK, you’ll need to define these steps in your processor.
You can use the EventsExtractor
and EventsStorer
steps in the example as a starting point for defining your own steps.
Make the following changes to events_extractor.rs
.
// TODO: Update the step namepub struct EventsExtractorwhere Self: Sized + Send + 'static, {}
#[async_trait]impl Processable for EventsExtractor { type Input = Vec<Transaction>; // TODO: Update the output type // This should be the data model you're extracting from the transactions type Output = Vec<EventModel>; type RunType = AsyncRunType;
async fn process( &mut self, item: TransactionContext<Vec<Transaction>>, ) -> Result<Option<TransactionContext<Vec<EventModel>>>, ProcessorError> { // TODO: Update extraction logic. // This should be the same as the extraction logic in the old `process_transactions` method let events = item .data .par_iter() .map(|txn| { process_events(txn) }) .flatten() .collect::<Vec<EventModel>>();
Ok(Some(TransactionContext { data: events, metadata: item.metadata, })) }}
Make the following changes to events_storer.rs
.
pub struct EventsStorerwhere Self: Sized + Send + 'static,{ conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig,}
impl EventsStorer { pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self { Self { conn_pool, processor_config, } }}
#[async_trait]// TODO: Update step nameimpl Processable for EventsStorer { // TODO: Update input type for the step. // The input type should match the output type of the extractor step. type Input = Vec<EventModel>; type Output = (); type RunType = AsyncRunType;
async fn process( &mut self, events: TransactionContext<Vec<EventModel>>, ) -> Result<Option<TransactionContext<()>>, ProcessorError> { let per_table_chunk_sizes: AHashMap<String, usize> = AHashMap::new(); let execute_res = execute_in_chunks( self.conn_pool.clone(), // TODO: Update this to the insertion query of your old processor insert_events_query, &events.data, get_config_table_chunk_size::<EventModel>("events", &per_table_chunk_sizes), ) .await; match execute_res { Ok(_) => { Ok(Some(TransactionContext { data: (), metadata: events.metadata, })) }, Err(e) => Err(ProcessorError::DBStoreError { message: format!( "Failed to store events versions {} to {}: {:?}", events.metadata.start_version, events.metadata.end_version, e, ), query: None, }), } }}
impl AsyncStep for EventsStorer {}
impl NamedStep for EventsStorer { fn name(&self) -> String { "EventsStorer".to_string() }}
4. Migrate your processor
Section titled “4. Migrate your processor”Now that we’ve migrated the processing logic to steps, we need to also migrate the processor to instantiate the steps and connect them together.
In events_processor.rs
, make the following changes:
// TODO: Update processor namepub struct EventsProcessor { pub config: IndexerProcessorConfig, pub db_pool: ArcDbPool, // If you have any other fields in your processor, add them here // You can instantiate them accordingly in the processor's `new` method}
In the run_processor
method, you’ll need to update the code to use the steps you created in Step 3.
pub async fn run_processor(self) -> Result<()> { {...}
// Define processor steps let transaction_stream_config = self.config.transaction_stream_config.clone(); let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), ..transaction_stream_config }) .await?; // TODO: Replace the next 2 lines with your steps let events_extractor = EventsExtractor {}; let events_storer = EventsStorer::new(self.db_pool.clone());
let version_tracker = VersionTrackerStep::new( get_processor_status_saver(self.db_pool.clone(), self.config.clone()), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, );
// Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) // TODO: Replace the next 2 lines with your steps .connect_to(events_extractor.into_runnable_step(), 10) .connect_to(events_storer.into_runnable_step(), 10) .connect_to(version_tracker.into_runnable_step(), 10) .end_and_return_output_receiver(10);
{...}}
5. Update your config.yaml
Section titled “5. Update your config.yaml”IndexerProcessorConfig
reworks the format of the config.yaml
file.
Use the example config.yaml
.
health_check_port: 8085server_config: processor_config: # TODO: Update with processor type type: "events_processor" transaction_stream_config: indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" # TODO: Update auth token auth_token: "AUTH_TOKEN" # TODO: Update with processor name request_name_header: "events-processor" db_config: # TODO: Update with your database connection string postgres_connection_string: postgresql://postgres:@localhost:5432/example # backfill_config: # backfill_alias: "events_processor_backfill_1"
6. Run your migrated processor
Section titled “6. Run your migrated processor”cd ~/{DIRECTORY_OF_PROJECT}/aptos-indexer-processor-examplecargo run --release -- -c config.yaml
In your terminal, you should start to see logs like this:
{"timestamp":"2025-01-13T21:23:21.785452Z","level":"INFO","message":"[Transaction Stream] Successfully connected to GRPC stream","stream_address":"https://grpc.mainnet.aptoslabs.com/","connection_id":"ec67ecc4-e041-4f17-a2e2-441e7ff21487","start_version":2186504987,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/transaction-stream/src/transaction_stream.rs","line_number":349,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}{"timestamp":"2025-01-13T21:23:21.785664Z","level":"INFO","message":"Spawning polling task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}{"timestamp":"2025-01-13T21:23:21.785693Z","level":"INFO","message":"Spawning processing task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}{"timestamp":"2025-01-13T21:23:21.785710Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetExtractor","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}{"timestamp":"2025-01-13T21:23:21.785912Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetStorer","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}{"timestamp":"2025-01-13T21:23:21.785978Z","level":"INFO","message":"Spawning polling task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}{"timestamp":"2025-01-13T21:23:21.786018Z","level":"INFO","message":"Spawning processing task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}
7. Backfilling with the SDK
Section titled “7. Backfilling with the SDK”With the SDK, we’ve made some improvements to the backfilling process. There are two options on backfilling:
- You can keep following the old way of backfilling, which is to run a second instance of the processor and updating
starting_version
to the backfill version. - The SDK introduces an improvement where you can track progress of a backfill and start and stop the backfill as needed.
If you’d like to use the new backfilling process, update your
config.yaml
like so:
health_check_port: 8085server_config: processor_config: # TODO: Update with processor type type: "events_processor" transaction_stream_config: indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" # TODO: Update with backfill version starting_version: {backfill version} # TODO: Update auth token auth_token: "AUTH_TOKEN" # TODO: Update with processor name request_name_header: "events-processor" db_config: # TODO: Update with your database connection string postgres_connection_string: postgresql://postgres:@localhost:5432/example backfill_config: # TODO: Update with your backfill alias. This should be unique for each backfill backfill_alias: "events_processor_backfill_1"