graphops / listener-radio Goto Github PK
View Code? Open in Web Editor NEWMonitor gossip on Graphcast
Home Page: https://docs.graphops.xyz/graphcast/radios/listener-radio
License: Apache License 2.0
Monitor gossip on Graphcast
Home Page: https://docs.graphops.xyz/graphcast/radios/listener-radio
License: Apache License 2.0
Release process that includes changelog, generate binaries for package, and build and push docker images
Problem statement
We should add a query that returns high level message stats for a specified Indexer (the Indexer address will be provided when the query is sent, as a query parameter).
Expectation proposal
The request itself can look something like this:
query GetIndexerStats($indexerId: String!) {
getIndexerStats(indexerId: $indexerId) {
messageCount
distinctSubgraphCount
}
}
The query resolver function on the API side can look like this:
// GraphQL resolver function
async fn get_indexer_stats(
&self,
ctx: &Context<'_>,
indexer_id: String,
) -> Result<IndexerStats, HttpServiceError> {
let pool = ctx.data_unchecked::<PgPool>();
get_indexer_stats(pool, &indexer_id).await.map_err(|e| e.into())
}
And finally the DB query function could be something similar:
pub async fn get_indexer_stats(
pool: &PgPool,
indexer_id: &str,
) -> Result<IndexerStats, anyhow::Error> {
// Query to count the number of messages
let message_count = sqlx::query!(
"SELECT COUNT(*) as count FROM messages WHERE indexer_id = $1",
indexer_id
)
.fetch_one(pool)
.await?
.count
.unwrap_or(0);
// Query to count the number of distinct subgraphs
let distinct_subgraph_count = sqlx::query!(
"SELECT COUNT(DISTINCT subgraph_id) as count FROM messages WHERE indexer_id = $1",
indexer_id
)
.fetch_one(pool)
.await?
.count
.unwrap_or(0);
Ok(IndexerStats {
message_count,
distinct_subgraph_count,
})
}
The names of variables, etc are only examples and could be improved.
Of course we also need to support a more specific time window that can be passed as a query parameter, we shouldn't just use the global one (but we should use the global one by default if it isn't provided in the request).
Problem statement
While there is an API function to delete messages, we should add an periodic pruning event into the listener radio.
Expectation proposal
Add config
max_storage
the maximum storage of generic messages, prune messages from smallest/earliest message ID until there's at most max_storage message left: [earliest_id
..latest_id-max_storage
]message_pruning_age
prune messages of old age: timestamp + pruning_age < current_time
Alternative considerations
Certain message types might have different preference (such as to keep upgrade_intent_message
longer than public_poi_message
. We may want a configuration matrix, but treat them all the same for now.
Additional context
graphops/graphcast-sdk#268
To efficiently capture, validate, and record various types of radio payload messages, including public POI messages, the Listener Radio plays a pivotal role in understanding a namespace in the Graphcast Network. Its main responsibility is to monitor the integrity and robustness of the network by effectively listening to broadcasts and storing valid messages in a dedicated database.
Listener Radio remains constantly active and receptive to all broadcasted messages within the Network. Irrespective of the message type, Listener Radio intercepts each message, ensuring no message goes unnoticed.
On receiving a message, the Listener Radio invokes a series of validation checks:
Once a message passes all validation checks, Listener Radio processes it for storage. The storage database is structured to store different types of messages in a general JSONB column, ensuring quick retrievals and queries in the future.
The Listener Radio is equipped to handle various payload message types and providing a quick glance of the overall network
Current Database Specifications
Generate daily/weekly summaries of captured and stored messages, aiding in monitoring and potential troubleshooting.
Feedback Mechanism: Allow Listener Radio to send acknowledgments back to the network, sharing an overall status to the peer nodes.
Problem statement
Manual work to setup 3la on the prod cluster
Expectation proposal
set-up
Alternative considerations
N/A
Additional context
N/A
After periodic network reconnectio & added timestamps messages do not stop when running natively, but in Docker the messages still stop
Remove
Specific metric ideas
The 3LA should export a variety of metrics indicating the state of the system:
General ideas
Local scope (These can be APIs instead)
Network wide
Currently all messages are received by 3LA, we should provide user config input to specify the radios to listen to, and more specifically the content topics.
Later we can use DSL for more specific filters, such as
Make a variant of GraphcastAgent that does not need filter protocol and can listen to a set of radios
Single jsonb column for all types of radio content - static/generic sequel schema to keep things easy
diesel
/sqlx
crateWith GraphQL schema defined, create API server for fetching data from 3LA
The basic one:
query {
rows{
id
message{
nonce
}
}
messages{
identifier
nonce
network
blockNumber
blockHash
signature
payload{
identifier
content
}
}
}
mutation{
deleteMessage(id:Int!){
identifier
nonce
}
}
Problem statement
After learning a few things in
Expectation proposal
General refactor
Alternative considerations
Problem statement
Currently the types are hardcoded in the types file
Expectation proposal
Problem statement
We need to add support for querying for "active Indexers" (Indexers who have sent at least 1 message on the subgraph-radio
pubsub topic on the mainnet Graphcast namespace in the last X minutes) to the GraphQL
API of Listener Radio. The time window will be configurable with a query parameter.
Expectation proposal
There is an existing function in Listener Radio's GraphQL API that supports fetching all messages:
// List messages but without filter options since msg fields are saved in jsonb
// Later flatten the messages to have columns from graphcast message.
async fn messages(
&self,
ctx: &Context<'_>,
) -> Result<Vec<GraphcastMessage<RadioPayloadMessage>>, HttpServiceError> {
let pool = ctx.data_unchecked::<Pool<Postgres>>();
let msgs: Vec<GraphcastMessage<RadioPayloadMessage>> = list_messages(pool)
.await?
.iter()
.map(|r| r.get_message())
.collect::<Vec<GraphcastMessage<RadioPayloadMessage>>>();
Ok(msgs)
}
It utilizes the list_messages
function from resolver.rs
:
pub async fn list_messages<T>(pool: &PgPool) -> Result<Vec<Row<T>>, anyhow::Error>
where
T: Clone + Serialize + DeserializeOwned + OutputType + std::marker::Unpin,
{
let rows = sqlx::query_as!(
Row,
r#"
SELECT id, message as "message: Json<T>"
FROM messages
ORDER BY id
"#
)
.fetch_all(pool)
.await
.map_err(|e| {
trace!("Database resolver connection error: {:#?}", e);
e
})?;
Ok(rows)
}
and it works great for fetching all messages. However, as indicated by the comment, it does not support filtering. For the "active Indexers" query we should add support for filtering and create a new DB helper function that supports fetching a list of Indexers, then create a new method under QueryRoot
that utilizes the new DB function and returns a bool, indicating whether the Indexer that the query is referencing (the address will be specified in the query parameters) has been active in the last X minutes or not.
This new query will have to be modular and all its parameters should be optional, so that the filtering logic itself is left entirely up to the client. We should support batch querying (providing multiple Indexer addresses in the query params). We should also support providing no Indexer addresses and getting back a list of all active Indexers. The time window is also optional, because if it's not provided we should use a default (1 day for instance).
Another thing we need to do is add support in Listener Radio for a "time window" i.e. being able to only keep messages in the db that are less than 24 hours old. This should be a global config argument. This argument should be optional, but have a default of 7 days(?). It will not completely replace the current MAX_STORAGE
, we should end up with:
MAX_STORAGE
is provided then that is used IN COMBINATION with the time window. MAX_STORAGE
therefore doesn't have a default value.Aside from that, we should support an even more specific time frame on the query side, so that for instance GraphSeer could send a query like "fetch all Indexers who've sent messages in the past 1 hour".
Alternative considerations
Problem statement
Add graceful shutdown to the radio
Expectation proposal
The ideal process of shutting down
Alternative considerations
This may fit better in the re-architecture of radios, but it is a good learning opportunity for fixing the current structure
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.