aptos-labs / aptos-indexer-processors Goto Github PK
View Code? Open in Web Editor NEWSet of core processors that index data on the Aptos blockchain
Home Page: https://aptos.dev/indexer/indexer-landing
Set of core processors that index data on the Aptos blockchain
Home Page: https://aptos.dev/indexer/indexer-landing
Using docker compose and the coin_flip index processor, i believe i am streaming events and detecting coin flip events, but nothing is getting written to the postgres database. I have verified connection strings, database and table names and am able to connect through a SQL tool, but the tables are always empty.
Does this log output mean that it should be writing an event?
index-processor-1 | {"timestamp": "2024-01-19 22:08:10,486", "level": "INFO", "fields": {"message": "[Parser] DB insertion time of one batch of transactions", "processor_name": "coin_flip", "start_version": "628091000", "end_version": "628091999", "service_type": "processor", "num_of_transactions": "1000", "duration_in_secs": "0.00018042", "size_in_bytes": "2939880"}, "module": "worker", "func_name": "run", "path_name": "/app/utils/worker.py", "line_no": 538}
config.yaml
connecting to testnet
starting_version: 626589722
# Optional. Stop processor after ending_version.
ending_version: 646589730
docker-compose.yaml
version: '3'
services:
index-processor:
build: .
environment:
DB_CONNECTION_URI: postgresql://coin_flip:postgres@db:5432/coin_flip
depends_on:
- db
volumes:
- ./config.yaml:/app/config/config.yaml
db:
image: postgres:15.2
environment:
POSTGRES_USER: coin_flip
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
volumes:
- db-data:/var/lib/postgresql/data
volumes:
db-data:
From the same experiment as above, there's a few other errors that were consistent (all with the TS processors):
A lot of this error: 13 INTERNAL: Received RST_STREAM with code 0. Even with retry, this sometimes would keep happening even with repeated retries, and only goes away by switching back and forth among a couple of different API keys
The GRPC service also returns duplicate transaction entries from time to time, up to a few times for the same transaction. For example transaction version 3014600. Seems to be starting from transaction versions around ~200K.
In general, it'd be great if we can figure out a way to reduce this kind of error if possible or at least have better error messages and may be better out-of-the-box error handling/retry
https://aptos-org.slack.com/archives/C03MN5F7WUV/p1701542240169189
The gRPC service seems to cut my connection every 5 minutes. It reconnects to the stream successfully (I'm using the Python version), but every 5 minutes, I get the following message:
2024-05-14 11:48:16 | WARNING | utils.worker:producer:221 - [Parser] RpcError receiving datastream response. | {'processor_name': 'staker', 'stream_address': 'grpc.testnet.aptoslabs.com:443', 'error': '<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.UNKNOWN\n\tdetails = "Stream removed"\n\tdebug_error_string = "UNKNOWN:Error received from peer ipv4:34.110.202.98:443 {grpc_message:"Stream removed", grpc_status:2, created_time:"2024-05-14T11:48:16.622325+01:00"}"\n>', 'next_version_to_fetch': 979680402, 'ending_version': None, 'service_type': 'processor'}
I'd like to know if it's a configuration on the test server.
Just run the Python indexer and wait 5 minutes.
Currently we have only one starting_version
field. This field applies unconditionally, even if there is a version in the DB. This means if the user wants to start from the version in the DB if present, but otherwise start from a given version, there is no way for them to do that without a tricky order of starts and stops with adding and removing starting_version
from the config.
We should have two fields (more concise naming TBD):
starting_version_if_nothing_in_db
: If given, start from this version if nothing is in the DB.starting_version_no_matter_what
: Start from this version even if there is something in DB.How to build the graphql API server in Python or in Rust?
While using
const client = new IndexerClient(
"https://indexer.mainnet.aptoslabs.com/v1/graphql"
);
const txnDetails = await client.getAccountTransactionsData(
"0x62818ab1a3567b03bdb19078e42b774fbdab279a2bf5dfae886d29461feb1fcd"
);
console.log("txnDetails: ", txnDetails);
this is my error
{
"errors": [
{
"message": "Connection template evaluation failed: 'Object' has no attritubte 'operation_name'.",
"extensions": {
"path": "$",
"code": "template-resolution-failed"
}
}
]
}
Transaction 1023992588 on testnet is failing deserialization, causing whole batches to be skipped.
The batch skipping bit has been added in #352.
This approach doesn't actually skip batches, it moves the start of the next batch by 1 and then tries again.
For the sake of argument, let's say we have a bad transaction at version 3456 and the size of batches is 5000 consistently.
If we're processing a batch that goes from trx 2000 to trx 6999, the process will fail and will restart again, but this time trying to process trx 2001 to trx 7000. It will then fail again and again, until we get passed trx 3456, and then the process will resume without errors.
The problem with that, continuing with the example above, is that if I'm interested in a transaction at version 3000, I'm never going to see it because it'll always be in a bad batch.
Ideally, the bad transaction will not fail and will either be deserialised properly, or bad fields will be ignored.
We've had to add a "slow mode" in our code so that if we see a deserialisation failure, we restart the stream asking for 1 transaction only, until we fail again, at which point we know we've processed the actual bad transaction and we can restart the stream in full speed mode.
Run the python indexer starting a bit before transaction 1023992588.
@banool @bowenyang007 Per offline discussion
none of the readmes are updated. This is the only place we actually updated: https://aptos.dev/indexer/legacy/migration#2-migrate-processors-to-transaction-stream-service
I'm using the indexer graphql API account_transactions for a NFT trade transaction.
It appears some data is missing in both token_activities and token_activities_v2 - previous owner (the seller) is null while the value should be 0xd6e3ad94ed9d1f628d6b4e1a287378158beb0930f4d0be0f89de683386746f53
Sharing the response here: "Token_activities_v2": [
{
"Aptos_names_from": [],
"Aptos_names_to": [],
"Before_value": null,
"From_address": null,
"Is_fungible_v2": null,
"To_address": "0x629ed8449b71c464d253159b5f8b26a5c26bce40dfdb7420a279041c380e4464",
"Token_amount": 1,
"Token_data_id": "0xab34d7afd9fb00e0008a6181dfa485ac3c6ae98b43b74b6de0fd4be1c58f9e5b",
"Token_standard": "v1",
"Transaction_timestamp": "2023-12-13T09:25:33.089239",
"Transaction_version": 359534234,
"Type": "0x3::token::DepositEvent",
"Property_version_v1": 0,
"Event_account_address": "0x629ed8449b71c464d253159b5f8b26a5c26bce40dfdb7420a279041c380e4464",
"Event_index": 0,
"Entry_function_id_str": "0x2c7bccf7b31baf770fdbcc768d9e9cb3d87805e255355df5db32ac9a669010a2::marketplace_v2::buy"
}
]
Is it a bug or intentional?
Query link:
Query variables:
{
"transaction_version": 359534234,
"account_address" : "0x629ed8449b71c464d253159b5f8b26a5c26bce40dfdb7420a279041c380e4464"
}
There appears to be a memory leak issue, as evidenced by the increasing memory consumption observed during the indexing process. Valgrind has been utilized to analyze memory allocations, revealing a significant portion of memory being allocated within the transaction vector. After processing each batch, the memory consumption persists and continues to grow. After two minute of indexing, the indexer's RAM usage exceeds 5 GB and continues to increase.
massif report:
->12.31% (76,360,304B) 0x2285542: alloc (alloc.rs:98) | ->12.31% (76,360,304B) 0x2285542: alloc::alloc::Global::alloc_impl (alloc.rs:181) | ->12.31% (76,360,304B) 0x2286318: <alloc::alloc::Global as core::alloc::Allocator>::allocate (alloc.rs:241) | ->12.31% (76,360,304B) 0x228607E: alloc::raw_vec::finish_grow (raw_vec.rs:521) | ->05.87% (36,373,376B) 0x6E834A: alloc::raw_vec::RawVec<T,A>::grow_amortized (raw_vec.rs:433) | | ->05.87% (36,373,376B) 0x70A8B8: alloc::raw_vec::RawVec<T,A>::reserve_for_push (raw_vec.rs:318) | | ->05.87% (36,373,376B) 0xAF5A77: alloc::vec::Vec<T,A>::push (mod.rs:1922) | | ->05.87% (36,373,376B) 0xB91356: prost::encoding::message::merge_repeated (encoding.rs:1114) | | ->04.03% (24,965,408B) 0xC78005: <aptos_protos::pb::aptos::transaction::v1::MoveStructTag as prost::message::Message>::merge_field (aptos.transaction.v1.rs:739) | | | ->04.03% (24,965,408B) 0xB9D7F6: prost::encoding::message::merge::{{closure}} (encoding.rs:1086) | | | ->04.03% (24,965,408B) 0x139AC98: prost::encoding::merge_loop (encoding.rs:374) | | | ->04.03% (24,965,408B) 0xB96C34: prost::encoding::message::merge (encoding.rs:1080) | | | ->04.02% (24,932,544B) 0xC7857A: <aptos_protos::pb::aptos::transaction::v1::WriteResource as prost::message::Message>::merge_field (aptos.transaction.v1.rs:398) | | | | ->04.02% (24,932,544B) 0xBA2456: prost::encoding::message::merge::{{closure}} (encoding.rs:1086) | | | | ->04.02% (24,932,544B) 0x139E8F8: prost::encoding::merge_loop (encoding.rs:374) | | | | ->04.02% (24,932,544B) 0xB95734: prost::encoding::message::merge (encoding.rs:1080) | | | | ->04.02% (24,932,544B) 0x8861EA: aptos_protos::pb::aptos::transaction::v1::write_set_change::Change::merge (aptos.transaction.v1.rs:329) | | | | ->04.02% (24,932,544B) 0xC688AA: <aptos_protos::pb::aptos::transaction::v1::WriteSetChange as prost::message::Message>::merge_field (aptos.transaction.v1.rs:278) | | | | ->04.02% (24,932,544B) 0xBA06F6: prost::encoding::message::merge::{{closure}} (encoding.rs:1086) | | | | ->04.02% (24,932,544B) 0x13A2838: prost::encoding::merge_loop (encoding.rs:374) | | | | ->04.02% (24,932,544B) 0xB94234: prost::encoding::message::merge (encoding.rs:1080) | | | | ->04.02% (24,932,544B) 0xB909BA: prost::encoding::message::merge_repeated (encoding.rs:1113) | | | | ->04.02% (24,932,544B) 0xC69C4E: <aptos_protos::pb::aptos::transaction::v1::TransactionInfo as prost::message::Message>::merge_field (aptos.transaction.v1.rs:166) | | | | ->04.02% (24,932,544B) 0xB9B7A6: prost::encoding::message::merge::{{closure}} (encoding.rs:1086) | | | | ->04.02% (24,932,544B) 0x139B818: prost::encoding::merge_loop (encoding.rs:374) | | | | ->04.02% (24,932,544B) 0xB96B34: prost::encoding::message::merge (encoding.rs:1080) | | | | ->04.02% (24,932,544B) 0xC75E54: <aptos_protos::pb::aptos::transaction::v1::Transaction as prost::message::Message>::merge_field (aptos.transaction.v1.rs:37) | | | | ->04.02% (24,932,544B) 0xB9F846: prost::encoding::message::merge::{{closure}} (encoding.rs:1086) | | | | ->04.02% (24,932,544B) 0x139EBD8: prost::encoding::merge_loop (encoding.rs:374) | | | | ->04.02% (24,932,544B) 0xB95A34: prost::encoding::message::merge (encoding.rs:1080) | | | | ->04.02% (24,932,544B) 0xB91B0A: prost::encoding::message::merge_repeated (encoding.rs:1113) | | | | | | | ->00.01% (32,864B) in 1+ places, all below ms_print's threshold (01.00%)
Run indexer (commit 48d7794) with config:
health_check_port: 8084 server_config: processor_config: type: coin_processor indexer_grpc_data_service_address: https://grpc.testnet.aptoslabs.com:443 postgres_connection_string: *********** auth_token: ************** number_concurrent_processing_tasks: 1 starting_version: 951262066
Presently, processor insertions are nondeterministic when concurrent processing tasks are enabled, such that the processor must be pinned to single-threading to enforce total ordering of transactions.
In practice, this slows down processing, in particular doubling or even tripling the time to sync to chain tip, for example with the Econia Data Service Stack (https://econia.dev/off-chain/dss/data-service-stack).
Offline notes and suggestions:
@larry-aptos, perhaps we could devise some kind of multi worker sequential execution scheme.
One relatively simple implementation is to allow for a reduce. Currently we create multiple threads (map) and these directly write to the db, but if we actually could compute the results of the threads and run through a reduce against data already in the db we could achieve ordering.
Myself
- Cache insertions in a postgres table and only insert to main tables once colliding threads are complete
- Execute each insertion as a subtransaction of an overall postgres transaction, comitting once colliding threads are complete
cc @CRBl69
GraphQL API has no support for validator_transaction
transaction type.
Using list of available graphQL tables (https://cloud.hasura.io/public/graphiql?endpoint=https://api.mainnet.aptoslabs.com/v1/graphql) I can find only following transaction related tables:
block_metadata_transactions
user_transactions
account_transactions
But no validator_transaction
table. This type was added and used only recently but is important to have.
Example of transaction of this type: https://explorer.aptoslabs.com/txn/975859175?network=mainnet
fun insert_current_coin_balances (https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/processor/src/processors/coin_processor.rs#L216). If two+ records with the same owner_address, coin_type_hash will be in one chunk, then insertion will be impossible (ON CONFLICT DO UPDATE).
INSERT INTO current_coin_balances (owner_address, coin_type_hash, coin_type,
amount, last_transaction_version, last_transaction_timestamp)
VALUES
('0x1', '0x1', 'test', 1, 1, now()),
('0x1', '0x1', 'test', 1, 2, now())
ON CONFLICT (owner_address, coin_type_hash) DO UPDATE
SET
amount = excluded.amount,
last_transaction_version = excluded.last_transaction_version
WHERE current_coin_balances.last_transaction_version <= excluded.last_transaction_version;
The focus contract address alpha feature is presumably intended to simplify client filtering behavior: only events corresponding to a particular Move package are allowed through the top-level worker process
However, the underlying implementation appears to filter based on the public entry function rather than by the event address. E.g. if a project composes on top of another and invokes the underlying package via a wrapped function call, then the wrapping package's events (which are also the wrapped package's events) get removed
Instead, it is suggested that filtration rely on the address of emitted event types, or alternatively, the focus_contract_addresses
be renamed to something like entry_function_address
with an additional event_type_address
server_config:
transaction_filter:
focus_contract_addresses:
- $ECONIA_ADDRESS
execute the query in https://cloud.hasura.io/public/graphiql?endpoint=https://indexer.mainnet.aptoslabs.com/v1/graphql
query MyQuery {
current_token_ownerships_v2(where: {token_data_id: {
_eq: "0x5a7b9686aeb01a38bf2450385fd07a9202c5a0f7d7c41495a4f58e8d40984f4e"
}}
) {
amount
owner_address
last_transaction_version
}
}
The result is
{
"data": {
"current_token_ownerships_v2": [
{
"amount": 0,
"owner_address": "0xadeb45c274f9f4f535afe8957a8cf9ffecbd2b79026fba6c207111136d963f14",
"last_transaction_version": 371523890
},
{
"amount": 0,
"owner_address": "0x82117cc55459b4de3ffb371d014d164273e9f2795545ece7c55e8c76bced1e7a",
"last_transaction_version": 377597523
},
{
"amount": 0,
"owner_address": "0x24eea652ba98ed744267eee683805a6a53091a842ea914d4bc43785d7de90c6b",
"last_transaction_version": 371958547
},
{
"amount": 0,
"owner_address": "0x91d8b03c217aea6f2b36bc182dc34021ad6c81fa3dded74cb5240252c27e9657",
"last_transaction_version": 372285477
},
{
"amount": 0,
"owner_address": "0x7f6fd0671110708302d00c2c1549dec9e05183588eed95fdd4f3864e9524da9e",
"last_transaction_version": 374455789
},
{
"amount": 1,
"owner_address": "0xe18dec131fa7165f807d451c468028b1768d04ec52764cbd0234295d4ab8a08d",
"last_transaction_version": 374455789
}
]
}
}
According to the logic in the Indexer, this token is currently owned by 0xe18dec131fa7165f807d451c468028b1768d04ec52764cbd0234295d4ab8a08d
, but result is wrong because this token is burned at 377597523
It seems indexer doesn't process DeleteListingEvent
so the amount corresponding to 0xe18dec131fa7165f807d451c468028b1768d04ec52764cbd0234295d4ab8a08d
didn't update
more cases:
token id:
0xa72923d8863c8370d0673cca437262fcfbfa805844003fdb17d344f56e426e8b
Where can the value of indexer_api_key in the config.yaml
configuration file be obtained from? thanks!
Is there any updates on self hosted indexer api?
https://aptos.dev/indexer/api/self-hosted
I'm running processors which are saving data to Postgres DB, and I would like to open an endpoint to this.
It seems no guide on this is provided yet.
If the NFT mint & burn acttions are not too close in time, the burn event will be missing
token_activities_v2
table for this NFTThe possible reason is the hashmap token_v2_metadata_helper is constructed by write resource, but the burn event like this has no such write resource, which breaks the process progress
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.