Skip to content
🎉 Welcome! Translations are currently experimental. | 翻訳は現在実験的です。 | 翻译目前处于实验阶段。
Click here to submit feedback! | ここをクリックしてフィードバックを送信してください! | 点击这里提交反馈!
BuildIndexerIndexer SDKAdvanced TutorialsMigrate to Indexer SDK

Migrate to Indexer SDK

This guide contains instructions on how to migrate your legacy custom processor (that’s written in the old way) to Indexer SDK.

1. Clone the example repo

We use example events processor in aptos-indexer-processor-example as a starting point for the migration.

git clone https://github.com/aptos-labs/aptos-indexer-processor-example.git

2. Migrate your processor config

Previously, you would create a branch of aptos-indexer-processors and update the processor config to include your custom processor. This legacy approach made it very difficult to upgrade your processor. To address this, the SDK no longer depends on aptos-indexer-processors. As a result, you’ll need to define your own IndexerProcessorConfig and ProcessorConfig structs.

The IndexerProcessorConfig defines the base configuration for all processors that you’ll be running. The ProcessorConfig is an enum that contains all the individual processor configs.

Update the following files in your project:

If you’d like to read more about configuration in the SDK, take a look at the Create a Processor guide.

3. Migrate processing logic to steps

In the old way, you defined your processor’s logic by implementing ProcessorTrait’s process_transactions method.

Example events processor written with the old way:

#[async_trait]
impl ProcessorTrait for EventsProcessor {
    async fn process_transactions(
        ...
    ) -> anyhow::Result<ProcessingResult> {
        // Extract events from transactions 
        let events: Vec<EventModel> = process_events(transactions);
 
        // Store the events in the database
        let tx_result = insert_to_db(
            self.get_pool(),
            self.name(),
            start_version,
            end_version,
            &events,
            &self.per_table_chunk_sizes,
        )
        .await;
 
        return tx_result;
    }
}
 
async fn insert_to_db(
    conn: ArcDbPool,
    name: &'static str,
    start_version: u64,
    end_version: u64,
    events: &[EventModel],
    per_table_chunk_sizes: &AHashMap<String, usize>,
) -> Result<(), diesel::result::Error> {
    tracing::trace!(
        name = name,
        start_version = start_version,
        end_version = end_version,
        "Inserting to db",
    );
    execute_in_chunks(
        conn,
        insert_events_query,
        events,
        get_config_table_chunk_size::<EventModel>("events", per_table_chunk_sizes),
    )
    .await?;
    Ok(())
}

With the SDK, we’ve introduced the concept of steps, which represent independent units of processing logic. In the EventsProcessor example, the extraction of events and storing them in the database can be broken down into two steps.

To migrate your processor to the SDK, you’ll need to define these steps in your processor. You can use the EventsExtractor and EventsStorer steps in the example as a starting point for defining your own steps.

Make the following changes to events_extractor.rs.

// TODO: Update the step name
pub struct EventsExtractor
where
    Self: Sized + Send + 'static, {}
 
#[async_trait]
impl Processable for EventsExtractor {
    type Input = Vec<Transaction>;
    // TODO: Update the output type
    // This should be the data model you're extracting from the transactions
    type Output = Vec<EventModel>;
    type RunType = AsyncRunType;
 
    async fn process(
        &mut self,
        item: TransactionContext<Vec<Transaction>>,
    ) -> Result<Option<TransactionContext<Vec<EventModel>>>, ProcessorError> {
        // TODO: Update extraction logic. 
        // This should be the same as the extraction logic in the old `process_transactions` method
        let events = item
            .data
            .par_iter()
            .map(|txn| {
                process_events(txn)
            })
            .flatten()
            .collect::<Vec<EventModel>>();
 
        Ok(Some(TransactionContext {
            data: events,
            metadata: item.metadata,
        }))
    }
}

Make the following changes to events_storer.rs.

pub struct EventsStorer
where
    Self: Sized + Send + 'static,
{
    conn_pool: ArcDbPool,
    processor_config: DefaultProcessorConfig,
}
 
impl EventsStorer {
    pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self {
        Self {
            conn_pool,
            processor_config,
        }
    }
}
 
#[async_trait]
// TODO: Update step name
impl Processable for EventsStorer {
    // TODO: Update input type for the step. 
    // The input type should match the output type of the extractor step.
    type Input = Vec<EventModel>;
    type Output = ();
    type RunType = AsyncRunType;
 
    async fn process(
        &mut self,
        events: TransactionContext<Vec<EventModel>>,
    ) -> Result<Option<TransactionContext<()>>, ProcessorError> {
        let per_table_chunk_sizes: AHashMap<String, usize> = AHashMap::new();
        let execute_res = execute_in_chunks(
            self.conn_pool.clone(),
            // TODO: Update this to the insertion query of your old processor
            insert_events_query,
            &events.data,
            get_config_table_chunk_size::<EventModel>("events", &per_table_chunk_sizes),
        )
        .await;
        match execute_res {
            Ok(_) => {
                Ok(Some(TransactionContext {
                    data: (),
                    metadata: events.metadata,
                }))
            },
            Err(e) => Err(ProcessorError::DBStoreError {
                message: format!(
                    "Failed to store events versions {} to {}: {:?}",
                    events.metadata.start_version, events.metadata.end_version, e,
                ),
                query: None,
            }),
        }
    }
}
 
impl AsyncStep for EventsStorer {}
 
impl NamedStep for EventsStorer {
    fn name(&self) -> String {
        "EventsStorer".to_string()
    }
}

4. Migrate your processor

Now that we’ve migrated the processing logic to steps, we need to also migrate the processor to instantiate the steps and connect them together. In events_processor.rs, make the following changes:

// TODO: Update processor name
pub struct EventsProcessor {
    pub config: IndexerProcessorConfig,
    pub db_pool: ArcDbPool,
    // If you have any other fields in your processor, add them here
    // You can instantiate them accordingly in the processor's `new` method
}

In the run_processor method, you’ll need to update the code to use the steps you created in Step 3.

pub async fn run_processor(self) -> Result<()> {
    {...}
 
    // Define processor steps
    let transaction_stream_config = self.config.transaction_stream_config.clone();
    let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
        starting_version: Some(starting_version),
        ..transaction_stream_config
    })
    .await?;
    // TODO: Replace the next 2 lines with your steps 
    let events_extractor = EventsExtractor {};
    let events_storer = EventsStorer::new(self.db_pool.clone());
    
    let version_tracker = VersionTrackerStep::new(
        get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
        DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
    );
 
    // Connect processor steps together
    let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
        transaction_stream.into_runnable_step(),
    )
    // TODO: Replace the next 2 lines with your steps
    .connect_to(events_extractor.into_runnable_step(), 10)
    .connect_to(events_storer.into_runnable_step(), 10)
    .connect_to(version_tracker.into_runnable_step(), 10)
    .end_and_return_output_receiver(10);
 
    {...}
}

5. Update your config.yaml

IndexerProcessorConfig reworks the format of the config.yaml file. Use the example config.yaml.

health_check_port: 8085
server_config:
  processor_config:
    # TODO: Update with processor type
    type: "events_processor" 
  transaction_stream_config:
    indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
    # TODO: Update auth token
    auth_token: "AUTH_TOKEN"
    # TODO: Update with processor name
    request_name_header: "events-processor"
  db_config:
    # TODO: Update with your database connection string
    postgres_connection_string: postgresql://postgres:@localhost:5432/example
  # backfill_config:
  #   backfill_alias: "events_processor_backfill_1"

6. Run your migrated processor

cd ~/{DIRECTORY_OF_PROJECT}/aptos-indexer-processor-example
cargo run --release -- -c config.yaml

In your terminal, you should start to see logs like this:

{"timestamp":"2025-01-13T21:23:21.785452Z","level":"INFO","message":"[Transaction Stream] Successfully connected to GRPC stream","stream_address":"https://grpc.mainnet.aptoslabs.com/","connection_id":"ec67ecc4-e041-4f17-a2e2-441e7ff21487","start_version":2186504987,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/transaction-stream/src/transaction_stream.rs","line_number":349,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785664Z","level":"INFO","message":"Spawning polling task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}
{"timestamp":"2025-01-13T21:23:21.785693Z","level":"INFO","message":"Spawning processing task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"}
{"timestamp":"2025-01-13T21:23:21.785710Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetExtractor","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785912Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetStorer","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"}
{"timestamp":"2025-01-13T21:23:21.785978Z","level":"INFO","message":"Spawning polling task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}
{"timestamp":"2025-01-13T21:23:21.786018Z","level":"INFO","message":"Spawning processing task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"}

7. Backfilling with the SDK

With the SDK, we’ve made some improvements to the backfilling process. There are two options on backfilling:

  1. You can keep following the old way of backfilling, which is to run a second instance of the processor and updating starting_version to the backfill version.
  2. The SDK introduces an improvement where you can track progress of a backfill and start and stop the backfill as needed. If you’d like to use the new backfilling process, update your config.yaml like so:
health_check_port: 8085
server_config:
  processor_config:
    # TODO: Update with processor type
    type: "events_processor" 
  transaction_stream_config:
    indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443"
    # TODO: Update with backfill version
    starting_version: {backfill version}
    # TODO: Update auth token
    auth_token: "AUTH_TOKEN"
    # TODO: Update with processor name
    request_name_header: "events-processor"
  db_config:
    # TODO: Update with your database connection string
    postgres_connection_string: postgresql://postgres:@localhost:5432/example
  backfill_config:
    # TODO: Update with your backfill alias. This should be unique for each backfill
    backfill_alias: "events_processor_backfill_1"