Giter Site home page Giter Site logo

pinax-network / substreams-sink-websockets Goto Github PK

View Code? Open in Web Editor NEW
1.0 5.0 0.0 295 KB

Substreams Sink Websockets

Home Page: https://substreams-sink-websockets-production.up.railway.app

License: MIT License

TypeScript 94.14% Dockerfile 0.20% JavaScript 2.96% HTML 2.70%
substreams thegraph

substreams-sink-websockets's Introduction

Substreams Sink WebSockets

substreams-sink-websockets is a tool that allows developers to pipe data extracted from a blockchain to WebSockets.

WebSockets API

Method Params Description
ping Test connectivity to the WebSocket API.
time Get the current server time to the WebSocket API.
subscribe {chain, moduleHash} Subscribe to a Substreams module hash by chain.

Send Text Message with the following JSON payloads

subscribe Request

{
    "method": "subscribe",
    "params": {
        "chain": "bsc",
        "moduleHash": "0a363b2a63aadb76a525208f1973531d3616fbae"
    }
}

ping Request

{
    "method": "ping"
}

REST API

Pathname Description
GET /health Health check
GET /metrics Prometheus metrics
GET /chain Returns all available chain
GET /traceId Returns all traceId by chain
GET /moduleHash Returns all available moduleHash
GET /moduleHashByChain Returns all available moduleHash by chain
GET /openapi Returns api documentation in JSON format
GET /messages Returns the most recent messages
POST / {timestamp, signature, body} Webhook HTTP POST (Ed25519 signature)
POST / {"message": "PING"} Webhook HTTP POST Ping

Parameters

/messages

Parameter Type Description
chain string Filter results by chain name, cannot be used with distinct
moduleHash string Filter results by module hash
limit int Limit number of results shown with a maximum value of 50
sort string Sort by asc (ascending) or desc (descending)
distinct bool If set to true, will return list of results distinct by chain.

Example Request

/messages?chain=value1&moduleHash=value2&limit=value3&sort=value4

WebSockets examples

Requirements

Quickstart

$ bun install
$ bun dev

Linux Only

$ wget https://github.com/pinax-network/substreams-sink-websockets/releases/download/v0.1.7/substreams-sink-websockets
$ chmod +x ./substreams-sink-websockets

.env Environment variables

# required
PUBLIC_KEY="<Ed25519 public key>"

# optional
PORT=3000
SQLITE_FILENAME=db.sqlite
HOSTNAME=0.0.0.0
VERBOSE=true

Help

$ substreams-sink-websockets --help

Usage: substreams-sink-websockets [options]

Substreams Sink Websockets

Options:
  --public-key <string>          (required) Ed25519 public key (comma-separated for multiple public keys) (env: PUBLIC_KEY)
  --port <int>                   Server listen on HTTP port (default: 3000, env: PORT)
  --hostname <string>            Server listen on HTTP hostname (default: "0.0.0.0", env: HOSTNAME)
  --sqlite-filename <string>     SQLite database filename (default: "db.sqlite", env: SQLITE_FILENAME)
  --verbose <boolean>            Enable verbose logging (default: false, env: VERBOSE)
  --recent-messages-limit <int>  Limit recent messages (default: 50, env: RECENT_MESSAGES_LIMIT)
  -V, --version                  output the version number
  -h, --help                     display help for command

Docker environment

Pull from GitHub Container registry

docker pull ghcr.io/pinax-network/substreams-sink-websockets:latest

Build from source

docker build -t substreams-sink-websockets .

Run with .env file

docker run -it --rm --env-file .env ghcr.io/pinax-network/substreams-sink-websockets

๐Ÿ“– References

substreams-sink-websockets's People

Contributors

deniscarriere avatar samuelpapineau avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

substreams-sink-websockets's Issues

Deploy Service using `Replicas` on Railway

Deploy WebSocket sink on Railway using Replicas

Scope

  • Deploy 2x Webhook containers to push to WebSocket service (ex: bsc & polygon)
  • Deploy Websockets container using multiple Replicas (ex: 3)
  • Connect multiple users to Websocket endpoint
    • monitor if connection is dropped or not
    • monitor if duplicate messages are received
  • Mount db.sqlite to be shared between all replicas
  • Read Prometheus metrics via SQLite database (in memory might not work with replicas)

Add Commander CLI

  • implement Commander to include arguments via CLI

example

$ bun index.ts --port 8888

Request format

Request format

https://developers.binance.com/docs/binance-trading-api/websocket_api#request-format

  • Requests must be sent as JSON in text frames, one request per frame.
  • All timestamps are in milliseconds in UTC, unless noted otherwise.
  • All field names and values are case-sensitive, unless noted otherwise.
image

Example

{
  "id": "5494febb-d167-46a2-996d-70533eb4d976",
  "method": "exchangeInfo",
  "params": {
    "symbols": ["BNBBTC"]
  }
}

Code example for ID

import { randomUUID } from "crypto";

const id = randomUUID()
// => "f25c1ccc-3ed9-4cfb-9f71-555df546d7fe"

issue when running Docker

$ docker run -it --rm --env-file .env ghcr.io/pinax-network/substreams-sink-websockets run

WARNING: The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested
Server listening on http://72416cbe4f18:3000
Verifying with PUBLIC_KEY a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9
Reading SQLITE_FILENAME db.sqlite
 5 | export type KV = {key: string, value: string|number};
 6 | 
 7 | export function createDb(filename: string) {
 8 |     // create folder if does not exists
 9 |     if ( !fs.existsSync(filename) ) {
10 |         fs.mkdirSync(path.parse(filename).dir, {recursive: true});
            ^
ENOENT: No such file or directory
 syscall: "mkdir"
   errno: -2

      at createDb (/home/bun/app/src/sqlite.ts:10:8)
      at /home/bun/app/index.ts:17:11

Response format

Scope

  • investigate if including id is possible in Bun (we don't seem to have a way to inject user specific data to ws.send())
  • Response message with the following:
    • replace result: {} => data: {}
{
  id,
  status,
  data
}

Example

{
  "id": "187d3cb2-942d-484c-8271-4e2141bbadb1",
  "status": 200,
  "data": {
    "serverTime": 1656400526260
  }
}

Subscribe to ModuleHash by chain

Send Text Message with the following JSON payloads

subscribe Request

{
    "method": "subscribe",
    "params": {
        "chain": "bsc",
        "moduleHash": "0a363b2a63aadb76a525208f1973531d3616fbae"
    }
}

Store recent messages and expose as GET endpoint (`/messages`)

When a user connects for the first time, they do not have any context of any previous messages

The goal would be to store the latest X (as .env) amount of messages in SQLite DB

Expose a GET endpoint for the user to fetch recent messages

  • Store messages in SQLite by:
    • module hash
    • module hash + chain
  • Prune older messages by (X timestamp or X block number)
  • Expose GET endpoint /messages to return all recent messages with params:
    • filter by chain
    • filter by module hash
    • LIMIT
    • DISTINCT by chain (only show 1 message per chain)
    • SORT BY DESC as default (recent messages first)

Add support to register by chain

Allow user to subscribe to Websockets using the following parameters:

  • moduleHash
  • chain

Example payload

{
    "method": "subscribe",
    "params": {
        "moduleHash": "0a363b2a63aadb76a525208f1973531d3616fbae",
        "chain": "polygon"
    }
}

Send message Topic

  • <chain>:<moduleHash>
  • <moduleHash>

Scope

  • include server.publish for additional parameters
    • moduleHash
    • chain
  • Add new Prometheus Metrics to track total messages published for:
    • moduleHash
    • chain
  • GET API to know which params are supported:
    • /moduleHash
    • /chain

Enforce Origin Header

  • Reject requests without Origin Header

All browsers send an Origin header. You can use this header for security (checking for same origin, automatically allowing or denying, etc.) and send a 403 Forbidden if you don't like what you see. However, be warned that non-browser agents can send a faked Origin. Most applications reject requests without this header.

Test connectivity `ping`

Request

{
  "id": "922bcc6e-9de8-440d-9e84-7c80933a8d0d",
  "method": "ping"
}

Response

{
  "id": "922bcc6e-9de8-440d-9e84-7c80933a8d0d",
  "status": 200,
  "result": {}
}

Enforce connection rate limits

https://developers.binance.com/docs/binance-trading-api/websocket_api#connection-limits

  • There is a limit of 300 connections per attempt every 5 minutes.
  • The connection is per IP address.

Rate limits are accounted by intervals.

For example, a 1 MINUTE interval starts every minute. Request submitted at 00:01:23.456 counts towards the 00:01:00 minute's limit. Once the 00:02:00 minute starts, the count will reset to zero again.

Feature

  • store IP address in SQLite by incrementing each connection attempt, with the following fields:
    • IP address
    • connection attempts
    • last timestamp

Will need to extend SQLite data format to include 3 fields key/value/timestamp

Auto-subscribe from URL params

Having the user provide all the subscribe payload in the URL, allows the user to do both:

  • Connect to WebSockets
  • Subscribe based on params

Example:

ws://localhost:3000/subscribe?chain={CHAIN}&moduleHash={MODULE_HASH}
ws://localhost:3000/subscribe?chain=polygon&moduleHash=0a363b2a63aadb76a525208f1973531d3616fbae

Add Basic Authorization (Bun.password)

Deploy to npm

Scope

  • build for npm pacakge
  • add GitHub actions to auto-deploy to npm

Will now allow to run via without any additional install required

$ bunx pinax-network/substreams-sink-websockets

Refactor HTTP fetch to use `hono`

Since a lot of our API's are being built with Hono

We should be able to integrate Hono + Bun WebSockets

import { Hono } from 'hono'

const app = new Hono()
app.get('/', (c) => c.text('Hello Bun!'))

export default {
  port: 3000,
  fetch: app.fetch,
  websocket: {
    open: websocket.open,
    close: websocket.close,
    message: websocket.message,
  },
}

`/health` check based on Webhook sessions being connected

Reference: https://github.com/pinax-network/substreams-sink/blob/main/src/health.ts#L12-L14

  • /health OK should only return if at least one Trace ID exists (can use Prometheus Metrics of SQLite to fetch information)
  • Save incoming unique TraceID from Webhooks to SQLite
  • Add Prometheus Metrics to know how many new WebHook Trace IDs have been initialized total_trace_ids
  • Add HTTP GET API to know which Webhook Trace IDs have previously connected /trace_ids
    • Returns Array of String [Trace ID, Epoch timestamp]

HTTP GET /trace_ids

[
  ["654b2e1fd43e8468863595baaad68627", 1696005169]
]

Error format

Update error messages to the following format:

{
  id,
  status,
  error: {
    code,
    msg,
    data: {},
}

Features

To update

  • Already subscribed to
  • ModuleHash not found

Example

{
  "id": "fc93a61a-a192-4cf4-bb2a-a8f0f0c51e06",
  "status": 418,
  "error": {
    "code": -1003,
    "msg": "Way too much request weight used; IP banned until 1659146400000. Please use WebSocket Streams for live updates to avoid bans.",
    "data": {
      "serverTime": 1659142907531,
      "retryAfter": 1659146400000
    }
  }
}

Add `tslog`

  • improve logging by adding tslog

Output

$ bun start
$ bun run index.ts
2023-10-07 20:32:28.565 INFO            substreams-sink-websockets      Server listening on http://0.0.0.0:3000
2023-10-07 20:32:28.566 INFO            substreams-sink-websockets      Verifying with PUBLIC_KEY a3cb7366ee8ca77225b4d41772e270e4e831d171d1de71d91707c42e7ba82cc9
2023-10-07 20:32:28.566 INFO            substreams-sink-websockets      Reading SQLITE_FILENAME db.sqlite

add `unsubscribe` WebSocket method

  • add unsubscribe method
    • by ModuleHash
    • by ModuleHash from chain

subscribe Request

{
    "method": "unsubscribe",
    "params": {
        "chain": "bsc",
        "moduleHash": "0a363b2a63aadb76a525208f1973531d3616fbae"
    }
}

add `/cursor/latest` as plain/text

Use case

Allows Webhook sinks to connect directly to WebSocket sink's latest cursor endpoint

Params

  • chain
  • module_hash

SQL Statement

SELECT latest_cursor
FROM module_hashes
WHERE chain = '${chain}' AND module_hash = '${moduleHash}'
LIMIT 1

Endpoint /cursor/latest

  • status 200: returns cursor as plain/text
  • status 400: bad request

Reference

https://github.com/pinax-network/substreams-sink/releases/tag/v0.13.4

Simple HTTP text response

n-5SB30M-16YouthlRFszqWwLpcyB1JpXQPsLRNL1...

Reference

Subscribe to `blocks` payload (clock)

In case the user does not know the module hash and only needs to subscribe to "block" payload only

  • allow blocks to be subscribed
  • new method: blocks
    • params: chain
  • new topic pushing payload block (clock)data
  • deduplicate clock subscribe from other webhooks using the same chain (don't allow pushing the same clock twice)
    • create a unique Set of block_id
    • only subscribe if block_id is new

Subscribe by blocks only by chain

{
    "method": "blocks",
    "params": {
        "chain": "bsc"
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.