Giter Site home page Giter Site logo

graphops / listener-radio Goto Github PK

View Code? Open in Web Editor NEW
1.0 1.0 0.0 245 KB

Monitor gossip on Graphcast

Home Page: https://docs.graphops.xyz/graphcast/radios/listener-radio

License: Apache License 2.0

Shell 0.89% Dockerfile 2.08% Rust 97.03%
graph-protocol graphcast indexers monitor the-graph

listener-radio's People

Contributors

axiomatic-aardvark avatar chriswessels avatar hopeyen avatar

Stargazers

 avatar

listener-radio's Issues

Distribution support

Release process that includes changelog, generate binaries for package, and build and push docker images

[Feat.Req] Add query for high level message stats per Indexer

Problem statement
We should add a query that returns high level message stats for a specified Indexer (the Indexer address will be provided when the query is sent, as a query parameter).

Expectation proposal
The request itself can look something like this:

query GetIndexerStats($indexerId: String!) {
  getIndexerStats(indexerId: $indexerId) {
    messageCount
    distinctSubgraphCount
  }
}

The query resolver function on the API side can look like this:

// GraphQL resolver function
async fn get_indexer_stats(
    &self,
    ctx: &Context<'_>,
    indexer_id: String,
) -> Result<IndexerStats, HttpServiceError> {
    let pool = ctx.data_unchecked::<PgPool>();
    get_indexer_stats(pool, &indexer_id).await.map_err(|e| e.into())
}

And finally the DB query function could be something similar:

pub async fn get_indexer_stats(
    pool: &PgPool,
    indexer_id: &str,
) -> Result<IndexerStats, anyhow::Error> {
    // Query to count the number of messages
    let message_count = sqlx::query!(
        "SELECT COUNT(*) as count FROM messages WHERE indexer_id = $1",
        indexer_id
    )
    .fetch_one(pool)
    .await?
    .count
    .unwrap_or(0);

    // Query to count the number of distinct subgraphs
    let distinct_subgraph_count = sqlx::query!(
        "SELECT COUNT(DISTINCT subgraph_id) as count FROM messages WHERE indexer_id = $1",
        indexer_id
    )
    .fetch_one(pool)
    .await?
    .count
    .unwrap_or(0);

    Ok(IndexerStats {
        message_count,
        distinct_subgraph_count,
    })
}

The names of variables, etc are only examples and could be improved.

Of course we also need to support a more specific time window that can be passed as a query parameter, we shouldn't just use the global one (but we should use the global one by default if it isn't provided in the request).

[Feat.Req] Pruning msgs from db

Problem statement

While there is an API function to delete messages, we should add an periodic pruning event into the listener radio.

Expectation proposal

Add config

  • max_storage the maximum storage of generic messages, prune messages from smallest/earliest message ID until there's at most max_storage message left: [earliest_id..latest_id-max_storage]
  • message_pruning_age prune messages of old age: timestamp + pruning_age < current_time

Alternative considerations
Certain message types might have different preference (such as to keep upgrade_intent_message longer than public_poi_message. We may want a configuration matrix, but treat them all the same for now.

Additional context
graphops/graphcast-sdk#268

[Feat.Req] Message pipelining

Listener Radio

Problem Statement

To efficiently capture, validate, and record various types of radio payload messages, including public POI messages, the Listener Radio plays a pivotal role in understanding a namespace in the Graphcast Network. Its main responsibility is to monitor the integrity and robustness of the network by effectively listening to broadcasts and storing valid messages in a dedicated database.

Capturing Broadcast Messages

Listener Radio remains constantly active and receptive to all broadcasted messages within the Network. Irrespective of the message type, Listener Radio intercepts each message, ensuring no message goes unnoticed.

  • TODO: Listener radio currently stores a set of supported message types. It should eventually store all messages with compatible traits for GraphcastMessages.

Validating the Messages

On receiving a message, the Listener Radio invokes a series of validation checks:

  1. Authentication: Ensures that the message is from a trusted and legitimate source within the network.
  2. Integrity: Confirms that the message hasn't been altered or tampered with during transmission.
  3. Duplication: Checks the database to ensure the message hasn't been recorded previously, preventing double entries.
  4. Nonce check: Ensures a linear ordering of messages.

Storing Validated Messages

Once a message passes all validation checks, Listener Radio processes it for storage. The storage database is structured to store different types of messages in a general JSONB column, ensuring quick retrievals and queries in the future.

  • Implement an optimization routine to periodically prune older messages, freeing up storage and maintaining database performance.

Message services

The Listener Radio is equipped to handle various payload message types and providing a quick glance of the overall network

  1. Network Metrics: Periodic summary about network health, peer counts, active topics, and other vital statistics.
  2. HTTP server: Provide API to query overall or specific message entries

Expectation Proposal

Current Database Specifications

  1. Message ID: A unique identifier for each captured message.
  2. Message: A json object as a exact copy of messages from the network.
  • Add potential new fields
    1. Source: Details about the sender or broadcaster of the message.
    2. Type: Classifies the message (e.g., Public POI, Upgrade intent, etc.)
    3. Validation Timestamp: The exact time when the message was validated by the Listener Radio.

Automated Reporting

Generate daily/weekly summaries of captured and stored messages, aiding in monitoring and potential troubleshooting.

  • Add a set of metrics
  1. Total Messages Captured
    Provides an overall view of the network activity in terms of the number of messages broadcasted and captured by the Listener Radio.
  2. Breakdown of Message Types
    Display the number of messages categorized by type to offers a deeper understanding of the nature of network activity.
  3. Validation Success Rate
    Percentage of messages that successfully passed the validation checks against those that failed, gauge the integrity of broadcasts in the network.
  4. Database Storage Utilization
    Display the current storage usage in terms of total messages stored and available space, such that users can adjust their pruning preferences.
  5. Top Active Broadcasters
    List the top entities or nodes that are the most active in broadcasting messages. Helps identifying primary sources of information or potential spamming entities.
  6. Average Message Processing Time
    The mean time taken to capture, validate, and store each message as an indicator of the efficiency and responsiveness of the Listener Radio.
  7. Recent Message Log
    Display the last few messages with their type, source, and timestamp can provide a quick snapshot of recent network activity.
  8. Daily/Weekly Message Volume Trends
    Graphical representation of the number of messages captured over time (daily, weekly).
  9. Failed Message Analysis
    To provides reasons or categories for messages that failed validation (e.g., authentication failure, duplication, unsupported messages) for troubleshooting or identifying malicious activities.
  10. Network Latency Metrics
    Average time taken for a message to travel from the broadcaster to the Listener Radio can helps users to understand network performance and potential delays.
  11. Messages Awaiting Storage
    Count of messages that have been validated but are pending to be stored in the database will indicates potential bottlenecks in storage or processing.
  12. Unique Broadcasters Count
    Total number of unique entities or nodes broadcasting messages.
    Gives a sense of the diversity and size of the active network participants.
  13. Database Pruning Events
    Track when and how many messages were pruned from the database.

Alternative Considerations

Feedback Mechanism: Allow Listener Radio to send acknowledgments back to the network, sharing an overall status to the peer nodes.

Message ingestion stops

After periodic network reconnectio & added timestamps messages do not stop when running natively, but in Docker the messages still stop

Generate basic graphcast message metrics and summaries

Specific metric ideas
The 3LA should export a variety of metrics indicating the state of the system:

  • number_of_remote_messages
    • labels: deployment hash, (potentially also deployment_network, graphcast_network, sender_address)
    • value: Count of remote messages
    • use cases: visualize the traffic
  • indexer_count / gossip peer count
    • labels: deployment hash, (network
    • value: Count of unique indexers attached to each message
    • use cases: visualize the number of participants/size of the network

General ideas

  • Local scope (These can be APIs instead)

    • How many of radios are tapped into?
    • How many content topics are subscribed for each radio?
  • Network wide

    • How many content topics are being actively exchanged?
    • How many messages are sent across topics?
    • How many indexers are participating across the topics?

Use database for message storage

Single jsonb column for all types of radio content - static/generic sequel schema to keep things easy

  • PostgreSQL with diesel/sqlx crate
  • use the jsonb type in PostgreSQL to store JSON data and use diesel to execute queries against it
  • diesel also provides a built-in way to serialize and deserialize Rust structs to and from JSON.
  • storage interface integrate into database crate. Sequelize tx
  • matches incoming message with simple DSL statements

Host http server and expose API endpoint

With GraphQL schema defined, create API server for fetching data from 3LA

The basic one:

query {
  rows{
    id
    message{
      nonce
    }
  }
  
  messages{
    identifier
    nonce
    network
    blockNumber
    blockHash
    signature
    payload{
      identifier
      content
    }
  }
}

mutation{
  deleteMessage(id:Int!){
   identifier 
   nonce
  }
}

[Feat.Req] More generalized radio message types

Problem statement

Currently the types are hardcoded in the types file

Expectation proposal

  • General radio message type - Trait object
  • Read in protobuf file URL that can be read into types

[Feat.Req] Add query for fetching active Indexers

Problem statement
We need to add support for querying for "active Indexers" (Indexers who have sent at least 1 message on the subgraph-radio pubsub topic on the mainnet Graphcast namespace in the last X minutes) to the GraphQL API of Listener Radio. The time window will be configurable with a query parameter.

Expectation proposal
There is an existing function in Listener Radio's GraphQL API that supports fetching all messages:

    // List messages but without filter options since msg fields are saved in jsonb
    // Later flatten the messages to have columns from graphcast message.
    async fn messages(
        &self,
        ctx: &Context<'_>,
    ) -> Result<Vec<GraphcastMessage<RadioPayloadMessage>>, HttpServiceError> {
        let pool = ctx.data_unchecked::<Pool<Postgres>>();

        let msgs: Vec<GraphcastMessage<RadioPayloadMessage>> = list_messages(pool)
            .await?
            .iter()
            .map(|r| r.get_message())
            .collect::<Vec<GraphcastMessage<RadioPayloadMessage>>>();
        Ok(msgs)
    }

It utilizes the list_messages function from resolver.rs:

pub async fn list_messages<T>(pool: &PgPool) -> Result<Vec<Row<T>>, anyhow::Error>
where
    T: Clone + Serialize + DeserializeOwned + OutputType + std::marker::Unpin,
{
    let rows = sqlx::query_as!(
        Row,
        r#"
SELECT id, message as "message: Json<T>"
FROM messages
ORDER BY id
        "#
    )
    .fetch_all(pool)
    .await
    .map_err(|e| {
        trace!("Database resolver connection error: {:#?}", e);
        e
    })?;

    Ok(rows)
}

and it works great for fetching all messages. However, as indicated by the comment, it does not support filtering. For the "active Indexers" query we should add support for filtering and create a new DB helper function that supports fetching a list of Indexers, then create a new method under QueryRoot that utilizes the new DB function and returns a bool, indicating whether the Indexer that the query is referencing (the address will be specified in the query parameters) has been active in the last X minutes or not.

This new query will have to be modular and all its parameters should be optional, so that the filtering logic itself is left entirely up to the client. We should support batch querying (providing multiple Indexer addresses in the query params). We should also support providing no Indexer addresses and getting back a list of all active Indexers. The time window is also optional, because if it's not provided we should use a default (1 day for instance).

Another thing we need to do is add support in Listener Radio for a "time window" i.e. being able to only keep messages in the db that are less than 24 hours old. This should be a global config argument. This argument should be optional, but have a default of 7 days(?). It will not completely replace the current MAX_STORAGE, we should end up with:

  1. Time window config argument is the pruning mechanism that is used by default, it has a default value (7 days?).
  2. If MAX_STORAGE is provided then that is used IN COMBINATION with the time window. MAX_STORAGE therefore doesn't have a default value.

Aside from that, we should support an even more specific time frame on the query side, so that for instance GraphSeer could send a query like "fetch all Indexers who've sent messages in the past 1 hour".

Alternative considerations

  • We could just add a query function that returns all messages that have been sent by the specific Indexer, and store that list in GraphSeer's state for use in other use cases, but in my opinion it's better to keep these functions modular. The more "burden" we put on Listener Radio for filtering and conditionals, the better.

[Bug] program termination

Problem statement

Add graceful shutdown to the radio

Expectation proposal

The ideal process of shutting down

  • First the radio stops listening to waku signals: Graphcast agent's waku node should stop
  • message senders between Graphcast agent and radio operator should drop as the already received signals are processed
  • All graphcast gossip operations shut down
  • message processing should finish decoding, validating, and storing the messages
  • radio operations should stop
  • http server shuts down
  • metrics shuts down
  • program exits

Alternative considerations

This may fit better in the re-architecture of radios, but it is a good learning opportunity for fixing the current structure

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.