Giter Site home page Giter Site logo

substreams-sink's Introduction

Substreams Sink CLI Node.js

Build Status npm version License

substreams-sink is the code template to build Substreams sinks in NodeJS. Sinks allows developers to pipe data extracted from a blockchain to a specified application.

๐Ÿ“– Documentation

Further resources

Get Substreams API Key

๐Ÿš€ Quick start

Installation

npm install substreams-sink

Features

  • includes Commander.js helper CLI
  • includes tslog helper logger
  • handle reading/saving Substreams cursor from file or URL
  • reads config .env file
  • includes Prometheus metrics helpers

CLI

Usage: substreams-sink run [options]

Substreams Sink

Options:
  -v, --version                        version for substreams-sink
  -e --substreams-endpoint <string>    Substreams gRPC endpoint to stream data from (env: SUBSTREAMS_ENDPOINT)
  --manifest <string>                  URL of Substreams package (env: MANIFEST)
  --module-name <string>               Name of the output module (declared in the manifest) (env: MODULE_NAME)
  -s --start-block <int>               Start block to stream from (defaults to -1, which means the initialBlock of the first module you are streaming) (default: "-1", env: START_BLOCK)
  -t --stop-block <int>                Stop block to end stream at, inclusively (env: STOP_BLOCK)
  -p, --params <string...>             Set a params for parameterizable modules. Can be specified multiple times. (ex: -p module1=valA -p module2=valX&valY) (default: [], env: PARAMS)
  --substreams-api-key <string>        API key for the Substream endpoint (env: SUBSTREAMS_API_KEY)
  --delay-before-start <int>           Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START)
  --cursor <string>                    Cursor to stream from. Leave blank for no cursor
  --production-mode <boolean>          Enable production mode, allows cached Substreams data if available (choices: "true", "false", default: false, env: PRODUCTION_MODE)
  --final-blocks-only <boolean>        Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (choices: "true", "false", default: false, env: FINAL_BLOCKS_ONLY)
  --inactivity-seconds <int>           If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS)
  --headers [string...]                Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
  --plaintext <boolean>                Establish GRPC connection in plaintext (choices: "true", "false", default: false, env: PLAIN_TEXT)
  --verbose <boolean>                  Enable verbose logging (choices: "true", "false", default: false, env: VERBOSE)
  --hostname <string>                  The process will listen on this hostname for any HTTP and Prometheus metrics requests (default: "localhost", env: HOSTNAME)
  --port <int>                         The process will listen on this port for any HTTP and Prometheus metrics requests (default: 9102, env: PORT)
  --metrics-labels [string...]         To apply generic labels to all default metrics (ex: --labels foo=bar) (default: {}, env: METRICS_LABELS)
  --collect-default-metrics <boolean>  Collect default metrics (choices: "true", "false", default: false, env: COLLECT_DEFAULT_METRICS)
  -h, --help                           display help for command

Example

.env

# Get Substreams API Key
# https://app.pinax.network
# https://app.streamingfast.io/
SUBSTREAMS_API_KEY=...
SUBSTREAMS_ENDPOINT=https://eth.substreams.pinax.network:443

# SPKG
MANIFEST=https://github.com/pinax-network/substreams/releases/download/blocks-v0.1.0/blocks-v0.1.0.spkg
MODULE_NAME=map_blocks
START_BLOCK=1000000
STOP_BLOCK=1000020

example.js

import { commander, setup, prometheus, http, logger, fileCursor } from "substreams-sink";

const pkg = {
  name: "substreams-sink",
  version: "0.0.1",
  description: "Substreams Sink long description",
}

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Custom Prometheus Counters
const customCounter = prometheus.registerCounter("custom_counter");

command.action(async options => {
  // Get cursor from file
  const cursor = fileCursor.readCursor("cursor.lock");

  // Setup sink for Block Emitter
  const { emitter } = await setup({...options, cursor});

  emitter.on("session", (session) => {
    console.log(session);
  });

  emitter.on("progress", (progress) => {
    console.log(progress);
  });

  // Stream Blocks
  emitter.on("anyMessage", (message, cursor, clock) => {
    customCounter?.inc(1);
    console.log(message);
    console.log(cursor);
    console.log(clock);
  });

  // Setup HTTP server & Prometheus metrics
  http.listen(options);

  // Save new cursor on each new block emitted
  fileCursor.onCursor(emitter, "cursor.lock");

  // Close HTTP server on close
  emitter.on("close", () => {
    http.server.close();
    console.log("โœ… finished");
  })

  // Start the stream
  emitter.start();
})
program.parse();

substreams-sink's People

Contributors

chamorin avatar deniscarriere avatar sdevkc avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

Forkers

sdevkc

substreams-sink's Issues

Api token should be optional

There are cases where an api token is not necessary (internal use for example), so the sink should not enforce this.

Export additional data to `setup()`

When using setup export additional data

  • Include Module Hash (using createModuleHash method)
  • Include Substreams binary package
  • Include start Cursor
const { emitter, substreamPackage, moduleHash, startCursor } = await setup(options, pkg);

All values that are computed during the setup step that is provided to the createRequest

const request = createRequest({
    substreamPackage,
    outputModule,
    startBlockNum,
    stopBlockNum,
    productionMode,
    startCursor: cursor.readCursor(cursorFile),
});

handle pushing manifest Prometheus metrics

  • Replacement for
  • Include createModuleHash
export function handleManifest(substreams: BlockEmitter, manifest: string, hash: string) {
    logger.info("manifest", { manifest, hash });
    const labelNames = ["hash", "manifest", "outputModule", "host", "auth", "startBlockNum", "productionMode"];
    registerGauge("manifest", "Substreams manifest and sha256 hash of map module", labelNames);
    const gauge = register.getSingleMetric("manifest") as Gauge;
    gauge.labels({
        hash,
        manifest,
        outputModule: substreams.outputModule,
        host: substreams.host,
        auth: substreams.auth,
        startBlockNum: substreams.startBlockNum,
        productionMode: String(substreams.productionMode)
    }).set(1)
}

Network mapping pulled from Substreams `network`

Substreams can include network in the substreams.yaml manifest registry, we should be able to pull this network field and dynamically update the Substreams Endpoint.

  • extract network value from Substreams package
  • define URL schema for Substreams endpoint
  • replace {network} mapping prior to connecting to Substreams endpoint

Reference

https://github.com/streamingfast/substreams-sink-sql/blob/6921aa5f6b2d023814174281053168a78cc0338e/docs/tutorial/substreams.yaml#L37C1-L38C1

Example

https://{network}.substreams.pinax.network:9000

substreams.yaml = network:polygon

https://polygon.substreams.pinax.network:9000
image

PRODUCTION_MODE with INACTIVITY_SECONDS

When PRODUCTION_MODE=true and using INACTIVITY_SECONDS. I think there is a difference between it actually being inactive or not. right now it just exits after INACTIVITY_SECONDS even if its processing in the background.

I'm not sure how the inactivty should be defined here but atleast I don't think it should be considered inactive when response.message.value.processedBytes.totalBytesRead is increasing over time.

Just a a quick thought, maybe in onInactivitySeconds we can do something like:

let lastTotalBytesRead: bigint = 0n;
let currentTotalBytesRead: bigint = 0n;

...

emitter.on("progress", (progress) => {
    if (progress.processedBytes) {
        currentTotalBytesRead = progress.processedBytes.totalBytesRead;
    }
});

Add `--final-blocks-only` argument

  • --final-blocks-only Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD
    • .env FINAL_BLOCKS_ONLY

Support `--collect-default-metrics` to sink

if (options.collectDefaultMetrics) collectDefaultMetrics(options.labels);

bin/cli.ts

command.option('--collect-default-metrics <boolean>', "Collect default metrics", DEFAULT_COLLECT_DEFAULT_METRICS);

error โŒ headers.set is not a function

Using in https://github.com/pinax-network/substreams-sink-webhook

    // User parameters
    const outputModule = options.moduleName;
    const startBlockNum = options.startBlock;
    const stopBlockNum = options.stopBlock;
    const params = options.params;
    const headers = options.headers;
    const cursorPath = options.cursorPath;
    const productionMode = !options.disableProductionMode;
    // Adding default headers
    headers.set("User-Agent", "substreams-sink");
> [email protected] start
> tsc && node --no-warnings ./dist/bin/cli.js run

file:///Users/denis/Github/substreams-sink-webhook/node_modules/substreams-sink/dist/src/setup.js:33
    headers.set("User-Agent", "substreams-sink");
            ^

TypeError: headers.set is not a function
    at setup (file:///Users/denis/Github/substreams-sink-webhook/node_modules/substreams-sink/dist/src/setup.js:33:13)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async Command.action (file:///Users/denis/Github/substreams-sink-webhook/dist/index.js:9:37)

Node.js v20.5.0

error: token.includes("."))

file:///Users/denis/.nvm/versions/node/v21.6.2/lib/node_modules/substreams-sink-csv/node_modules/substreams-sink/dist/src/setup.js:23
    if (token.includes("."))
              ^

TypeError: Cannot read properties of undefined (reading 'includes')

Add `list` command to all default sinks

Could be a good way to do this with @substreams/core

program.command('list')
    .showHelpAfterError()
    .description('List all compatible output modules for a given Substreams package')
    .argument('<spkg>', 'URL or IPFS hash of Substreams package')
    .action(async spkg => {
        const modules = await list(spkg)
        logger.info('list', {modules})
        process.stdout.write(JSON.stringify(modules) + '\n')
    })
export async function list(url: string) {
    const spkg = await readPackage(url)
    const compatible = []

    for (const { name, output } of getModules(spkg)) {
        if (!output) continue
        logger.info('module', { name, output })
        if (!MESSAGE_TYPE_NAMES.includes(output.type.replace('proto:', ''))) continue
        compatible.push(name)
    }

    return compatible
}

Add `progress` prometheus metrics

export function onPrometheusMetrics(emitter: BlockEmitter) {
   ...
   emitter.on("progress", handleProgress);
}

export function handleProgress(progress: ModulesProgress) {
    ...
}

References

https://github.com/streamingfast/substreams/blob/47ddf079aa7fcd89c03eb99771cda6749b56c20e/proto/sf/substreams/rpc/v2/service.proto#L132-L145

// ModulesProgress is a message that is sent every 500ms
message ModulesProgress {
  // previously: repeated ModuleProgress modules = 1;
  // these previous `modules` messages were sent in bursts and are not sent anymore.
  reserved 1;
  // List of jobs running on tier2 servers
  repeated Job running_jobs = 2;
  // Execution statistics for each module
  repeated ModuleStats modules_stats = 3;
  // Stages definition and completed block ranges
  repeated Stage stages = 4;

  ProcessedBytes processed_bytes = 5;
}

message ProcessedBytes {
  uint64 total_bytes_read = 1;
  uint64 total_bytes_written = 2;
}


message Error {
  string module = 1;
  string reason = 2;
  repeated string logs = 3;
  // FailureLogsTruncated is a flag that tells you if you received all the logs or if they
  // were truncated because you logged too much (fixed limit currently is set to 128 KiB).
  bool logs_truncated = 4;
}


message Job {
    uint32 stage = 1;
    uint64 start_block = 2;
    uint64 stop_block = 3;
    uint64 processed_blocks = 4;
    uint64 duration_ms = 5;
}

message Stage {
    repeated string modules = 1;
    repeated BlockRange completed_ranges = 2;
}

// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2
// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module
message ModuleStats {
    // name of the module
    string name = 1;

    // total_processed_blocks is the sum of blocks sent to that module code
    uint64 total_processed_block_count = 2;
    // total_processing_time_ms is the sum of all time spent running that module code
    uint64 total_processing_time_ms = 3;

    //// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
    repeated ExternalCallMetric external_call_metrics = 4;

    // total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...)
    uint64 total_store_operation_time_ms = 5;
    // total_store_read_count is the sum of all the store Read operations called from that module code
    uint64 total_store_read_count = 6;

    // total_store_write_count is the sum of all store Write operations called from that module code (store-only)
    uint64 total_store_write_count = 10;

    // total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only)
    // note that DeletePrefix can be a costly operation on large stores
    uint64 total_store_deleteprefix_count = 11;

    // store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only)
    uint64 store_size_bytes = 12;

    // total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only)
    uint64 total_store_merging_time_ms = 13;

    // store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way.
    bool store_currently_merging = 14;

    // highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only)
    uint64 highest_contiguous_block = 15;
}

Only provide Prometheus metrics at specific endpoint `/metrics`

Only provide Prometheus metrics to single HTTP route /metrics, this makes it easier for other routes to be expanded by other HTTP services.

// Create a local server to serve Prometheus metrics
export const server = http.createServer(async (req, res) => {
    if (!req.url) return;
    if (req.method == "GET") {
        if (req.url === "/metrics") {
            res.writeHead(200, { 'Content-Type': prometheus.registry.contentType });
            return res.end(await prometheus.registry.metrics());
        }
    }
});

include `--labels [...string]` to sink

  • include --metrics-labels as CLI
  • include METRICS_LABELS as .env settings

bin/cli.ts

command.option('-l --labels [...string]', "To apply generic labels to all default metrics (ex: --labels foo=bar)", handleLabels, {})
export function handleLabels(value: string, previous: {}) {
    const params = new URLSearchParams(value);
    return { ...previous, ...Object.fromEntries(params) };
}

Provide `/health` check endpoint

  • provide /health endpoint which returns "OK" status 200 (or 204 really)
    • Only provide 200 once Substreams data has been received
  • or some other code in the 400 or 500 range if not ok.

Reference

https://docs.railway.app/deploy/healthchecks

"""
First, make sure your webserver has an endpoint (e.g. /health) that will return a response with HTTP status code 200 when your application is live and ready. If your application needs to do some initialization on startup (running database migrations, populating cache, etc.), it's a good idea to return a non-200 status code from your endpoint until the initialization completes.
"""

Example

export function health() {
    http.server.on("request", async (req, res) => {
        if (!req.url) return;
        const params = new URLSearchParams(req.url.split("?")[1] ?? "");
        try {
            if (req.method == "GET") {
                if ( req.url === "/") return toText(res, banner());
                if ( req.url === "/health" ) {
                    const messages = await getSingleMetric("substreams_sink_data_message")
                    if ( messages ) return toText(res, "OK");
                    return toText(res, "no messages received yet", 503);
                }
            }
            // throw new Error(`invalid request`);
        } catch (err: any) {
            res.statusCode = 400;
            return res.end(err.message);
        }
    });
}

A metric with the name `X` has already been registered

Receiving the following error

{"nativeError":{},"name":"Error","message":"A metric with the name trace_calls has already been registered.","stack":[{"fullFilePath":"/node_modules/prom-client/lib/registry.js:67:10","fileName":"registry.js","fileNameWithLine":"registry.js:67","fileColumn":"10","fileLine":"67","filePath":"node_modules/prom-client/lib/registry.js","filePathWithLine":"node_modules/prom-client/lib/registry.js:67","method":"Registry.registerMetric"},{"fullFilePath":"/node_modules/prom-client/lib/metric.js:48:13","fileName":"metric.js","fileNameWithLine":"metric.js:48","fileColumn":"13","fileLine":"48","filePath":"node_modules/prom-client/lib/metric.js","filePathWithLine":"node_modules/prom-client/lib/metric.js:48","method":"new Metric"},

Perhaps might need to check if the gauge/counter exists before registering a 2nd time.

if ( !prometheus.registry.getSingleMetric(name) ) {
    prometheus.registerGauge(name, "custom help", Object.keys(labels));
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.