Giter Site home page Giter Site logo

pinax-network / substreams-sink-webhook Goto Github PK

View Code? Open in Web Editor NEW
0.0 6.0 0.0 409 KB

Substreams Sink Webhook

Home Page: https://github.com/pinax-network/substreams-sink-webhook/pkgs/container/substreams-sink-webhook

TypeScript 90.65% JavaScript 8.87% Dockerfile 0.49%
streamingfast substreams thegraph webhook

substreams-sink-webhook's Introduction

Substreams Sink Webhook

substreams-sink-webhook is a tool that allows developers to pipe data extracted from a blockchain to Webhook.

HTTP Server examples

๐Ÿ“– References

POST Message

The POST message will be a JSON object with the following structure:

headers

POST http://localhost:3000 HTTP/1.1
content-type: application/json
x-signature-ed25519: 8f01c66ccda5b987c43d913290419572ea586dbef2077fa166c4a84797e1d2c76b305bc67ed43efb1fc841562620a61cb59c4d8a13de689a2e98ead19190f80c
x-signature-timestamp: 1707776632

body

The body will be a flatten JSON string with the following structure:

{
  "status": 200,
  "cursor": "OJGbpO9ZnZcwvxW38_FO8KWwLpcyA1lrUQPgKRFL04Py8yCW35v1VTB1O0-Elami3RztQlOp2tmcHC9y9ZQFuoDrxLpj6yU-FXorwoHr_OfqLPumMQwTJ-hgWeuKYNLeWDjTagn4ersEtNGzbvLaY0UxZZUhK2G62z1VptdXJfEWuiJmyjmrIZrRhK-WoNAS_rEkQ7L1xCmhDzJ4K0dTPcSDNPKZuDR2",
  "session": {
    "traceId": "3cbb0a1c772a47a72995d95f4c6d2cff",
    "resolvedStartBlock": 53448515
  },
  "clock": {
    "timestamp": "2024-02-12T22:23:51.000Z",
    "number": 53448530,
    "id": "f843bc26cea0cbd50b09699546a8a97de6a1727646c17a857c5d8d868fc26142"
  },
  "manifest": {
    "substreamsEndpoint": "https://polygon.substreams.pinax.network:443",
    "chain": "polygon",
    "finalBlockOnly": "false",
    "moduleName": "map_blocks",
    "type": "sf.substreams.v1.Clock",
    "moduleHash": "44c506941d5f30db6cca01692624395d1ac40cd1"
  },
  "data": {
    ...
  }
}

Generate Ed25519 public & private key pair

$ bunx substreams-sink-webhook keypair
{
  "publicKey": "36a89085d54d866c60ecccc2bf332d1c0dd5f1a810af175b1cfb7ff9e64b67d6",
  "privateKey": "67603675f8160b4e4ca67770eaf7df797f3a9617665a84ec3e9baf92c403fb4f"
}

or using curl

$ curl http://localhost:9102/keypair

Validate Ed25519 signature

import nacl from "tweetnacl";

// ...HTTP server
const PUBLIC_KEY = "APPLICATION_PUBLIC_KEY";

// get headers and body from POST request
const signature = request.headers.get("x-signature-ed25519");
const timestamp = request.headers.get("x-signature-timestamp");
const body = await request.text();

// validate signature using public key
const isVerified = nacl.sign.detached.verify(
  Buffer.from(timestamp + body),
  Buffer.from(signature, "hex"),
  Buffer.from(PUBLIC_KEY, "hex")
);

if (!isVerified) {
  return new Response("invalid request signature", { status: 401 });
}

.env Environment variables

# Webhook
PORT=9102
WEBHOOK_URL=http://127.0.0.1:3000

# Get Substreams API Key
# https://app.pinax.network
# https://app.streamingfast.io/
SUBSTREAMS_API_KEY=<Substreams API Token @ https://pinax.network>

# Substreams Package (*.spkg)
MANIFEST=https://github.com/pinax-network/substreams/releases/download/blocks-v0.1.0/blocks-v0.1.0.spkg
MODULE_NAME=map_blocks
START_BLOCK=-10
PRODUCTION_MODE=true

# Webhook (Optional)
PRIVATE_KEY=<Ed25519 Private Key>
MAXIMUM_ATTEMPTS=100
VERBOSE=true

Help

$ substreams-sink-webhook --help

Usage: substreams-sink-webhook run [options]

Substreams Sink Webhook

Options:
  -e --substreams-endpoint <string>    Substreams gRPC endpoint to stream data from (env: SUBSTREAMS_ENDPOINT)
  --manifest <string>                  URL of Substreams package (env: MANIFEST)
  --module-name <string>               Name of the output module (declared in the manifest) (env: MODULE_NAME)
  -s --start-block <int>               Start block to stream from (defaults to -1, which means the initialBlock of the first module you are streaming) (default: "-1", env: START_BLOCK)
  -t --stop-block <int>                Stop block to end stream at, inclusively (env: STOP_BLOCK)
  -p, --params <string...>             Set a params for parameterizable modules. Can be specified multiple times. (ex: -p module1=valA -p module2=valX&valY) (default: [], env: PARAMS)
  --substreams-api-key <string>        API key for the Substream endpoint (env: SUBSTREAMS_API_KEY)
  --delay-before-start <int>           Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START)
  --cursor-path <string>               File path or URL to cursor lock file (default: "cursor.lock", env: CURSOR_PATH)
  --http-cursor-auth <string>          Basic auth credentials for http cursor (ex: username:password) (env: HTTP_CURSOR_AUTH)
  --production-mode <boolean>          Enable production mode, allows cached Substreams data if available (default: "false", env: PRODUCTION_MODE)
  --inactivity-seconds <int>           If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS)
  --hostname <string>                  The process will listen on this hostname for any HTTP and Prometheus metrics requests (default: "localhost", env: HOSTNAME)
  --port <int>                         The process will listen on this port for any HTTP and Prometheus metrics requests (default: 9102, env: PORT)
  --metrics-labels [string...]         To apply generic labels to all default metrics (ex: --labels foo=bar) (default: {}, env: METRICS_LABELS)
  --collect-default-metrics <boolean>  Collect default metrics (default: "false", env: COLLECT_DEFAULT_METRICS)
  --headers [string...]                Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
  --final-blocks-only <boolean>        Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (default: "false", env: FINAL_BLOCKS_ONLY)
  --verbose <boolean>                  Enable verbose logging (default: "false", env: VERBOSE)
  --webhook-url <string>               Webhook URL to send POST (env: WEBHOOK_URL)
  --private-key <string>               Ed25519 private key to sign POST data payload (env: PRIVATE_KEY)
  --maximum-attempts <number>          Maximum attempts to retry POST (default: 100, env: MAXIMUM_ATTEMPTS)
  -h, --help                           display help for command

Docker environment

Pull from GitHub Container registry

docker pull ghcr.io/pinax-network/substreams-sink-webhook:latest

Run with .env file

docker run -it --rm --env-file .env ghcr.io/pinax-network/substreams-sink-webhook:latest run

Build from source

docker build -t substreams-sink-webhook .

Features

  • POST data to URL
  • Include Substreams Manifest to payload
    • substreamsEndpoint
    • chain
    • finalBlockOnly
    • moduleName
    • type
    • moduleHash
  • Include Substreams Clock to payload
    • timestamp
    • number
    • id
  • Includes Substreams Session to payload
    • traceId
    • resolvedStartBlock
  • Includes Substreams cursor to payload
  • Signing policy
    • TweetNaCl
    • R1 private keys
    • Ether.js
  • All messages are sent in block order, no need to parallelize
  • Support for Final Blocks Only --final-blocks-only
  • Support for Production Mode --production-mode
  • Retry policy
    • Exponential backoff (2x)
    • Initial Interval (1s)
    • Maximum Attempts (Infinity)
    • Maximum Interval (100 * initialInterval)
  • Dockerfile
  • Provide CLI arguments or Environment Variables (.env)
  • Allow params injection via --params or -p
  • Prometheus metrics
    • includes metrics from substreams-sink
    • HTTP POST requests
    • HTTP errors
  • PING URL on start (invalid + valid)
  • Save cursor.lock only after successful POST
    • Enforce retry policy on HTTP cursor updates

substreams-sink-webhook's People

Contributors

chamorin avatar deniscarriere avatar gcleroux avatar julienr1 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

substreams-sink-webhook's Issues

protocol error: promised `X` bytes in enveloped message, got `Y` bytes

When using https://github.com/pinax-network/substreams-clock/releases/download/v0.1.0/clock-v0.1.0.spkg

file:///Users/denis/Github/substreams-sink-webhook/node_modules/@bufbuild/connect-node/node_modules/@bufbuild/connect/dist/esm/protocol/async-iterable.js:604
                throw new ConnectError(message, Code.InvalidArgument);
                      ^

ConnectError: [invalid_argument] protocol error: promised 486 bytes in enveloped message, got 470 bytes
    at file:///Users/denis/Github/substreams-sink-webhook/node_modules/@bufbuild/connect-node/node_modules/@bufbuild/connect/dist/esm/protocol/async-iterable.js:604:23
    at Generator.next (<anonymous>)
    at resume (file:///Users/denis/Github/substreams-sink-webhook/node_modules/@bufbuild/connect-node/node_modules/@bufbuild/connect/dist/esm/protocol/async-iterable.js:27:44)
    at fulfill (file:///Users/denis/Github/substreams-sink-webhook/node_modules/@bufbuild/connect-node/node_modules/@bufbuild/connect/dist/esm/protocol/async-iterable.js:29:31)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  rawMessage: 'protocol error: promised 486 bytes in enveloped message, got 470 bytes',
  code: 3,
  metadata: Headers {
    [Symbol(headers list)]: HeadersList {
      cookies: null,
      [Symbol(headers map)]: Map(0) {},
      [Symbol(headers map sorted)]: null
    },
    [Symbol(guard)]: 'none'
  },
  details: [],
  cause: undefined
}

Running out of memory

  • when running multiple Webhooks

โŒ error

<--- Last few GCs --->

[85924:0x128008000]  1190759 ms: Mark-Compact 4037.0 (4129.9) -> 4022.9 (4130.2) MB, 514.08 / 0.00 ms  (average mu = 0.183, current mu = 0.201) task; scavenge might not succeed
[85924:0x128008000]  1191361 ms: Mark-Compact 4038.7 (4130.2) -> 4023.1 (4130.4) MB, 561.75 / 0.00 ms  (average mu = 0.126, current mu = 0.067) task; scavenge might not succeed


<--- JS stacktrace --->

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
 1: 0x100a7a0e4 node::Abort() [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 2: 0x100a7a2cc node::ModifyCodeGenerationFromStrings(v8::Local<v8::Context>, v8::Local<v8::Value>, bool) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 3: 0x100bfe018 v8::internal::V8::FatalProcessOutOfMemory(v8::internal::Isolate*, char const*, v8::OOMDetails const&) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 4: 0x100dd26e8 v8::internal::Heap::GarbageCollectionReasonToString(v8::internal::GarbageCollectionReason) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 5: 0x100dd659c v8::internal::Heap::CollectGarbageShared(v8::internal::LocalHeap*, v8::internal::GarbageCollectionReason) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 6: 0x100dd3000 v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::internal::GarbageCollectionReason, char const*) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 7: 0x100dd0d88 v8::internal::Heap::CollectGarbage(v8::internal::AllocationSpace, v8::internal::GarbageCollectionReason, v8::GCCallbackFlags) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 8: 0x100e37fd0 v8::internal::MinorGCJob::Task::RunInternal() [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
 9: 0x100adbaac node::PerIsolatePlatformData::RunForegroundTask(std::__1::unique_ptr<v8::Task, std::__1::default_delete<v8::Task>>) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
10: 0x100ada7c8 node::PerIsolatePlatformData::FlushForegroundTasksInternal() [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
11: 0x101449658 uv__async_io [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
12: 0x10145b730 uv__io_poll [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
13: 0x101449c1c uv_run [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
14: 0x1009ad754 node::SpinEventLoopInternal(node::Environment*) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
15: 0x100aba7e8 node::NodeMainInstance::Run(node::ExitCode*, node::Environment*) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
16: 0x100aba584 node::NodeMainInstance::Run() [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
17: 0x100a45658 node::Start(int, char**) [/Users/denis/.nvm/versions/node/v20.7.0/bin/node]
18: 0x186579058 start [/usr/lib/dyld]
[1]    85924 abort      substreams-sink-webhook run

Handle read ECONNRESET

Happens quite frequently where the gRPC connection is dropped, webhook exits when that happens

ConnectError: [unknown] read ECONNRESET
    at ConnectError.from (/Users/denis/Github/substreams-sink-webhook/node_modules/@connectrpc/connect/dist/cjs/connect-error.js:75:20)
    at abort (/Users/denis/Github/substreams-sink-webhook/node_modules/@connectrpc/connect/dist/cjs/protocol/run-call.js:109:55)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async run (/Users/denis/Github/substreams-sink-webhook/node_modules/@connectrpc/connect/dist/cjs/callback-client.js:78:84) {
  rawMessage: 'read ECONNRESET',
  code: 2,
  metadata: HeadersList {
    cookies: null,
    [Symbol(headers map)]: Map(0) {},
    [Symbol(headers map sorted)]: null
  },
  details: [],
  cause: Error: read ECONNRESET
      at TLSWrap.onStreamRead (node:internal/stream_base_commons:217:20) {
    errno: -54,
    code: 'ECONNRESET',
    syscall: 'read'
  }
}

RAM continuously increases overtime

Perhaps need to look into some garbage collection, seems like RAM is slowly increasing overtime

Growth of 1.5GB over 4 days

image

7day charts, Drops are caused by redeployments

image

Include Session `traceId` to POST metadata

Scope

  • Include session details in POST metadata
    const metadata = {
    cursor,
    clock: {
    timestamp: clock.timestamp.toDate().toISOString(),
    number: Number(clock.number),
    id: clock.id,
    },
    manifest: {
    substreamsEndpoint: options.substreamsEndpoint,
    moduleName: options.moduleName,
    moduleHash,
    },
    }

session info

{
    "traceId": "3bf605c8ff31f6693498f0c2d815c492",
    "resolvedStartBlock": "18243869",
    "linearHandoffBlock": "18243700",
    "maxParallelWorkers": "100"
}

Reference

https://github.com/substreams-js/substreams-node

To fetch Session Trace ID =>

// Session Trace ID
emitter.on("session", (session) => {
  console.dir(session);
});

Add CLI & `.env` configurations for Retry policy

  • Set default values in config
  • Allow to change Retry policy params by:
    • .env variables
    • CLI argument
  • Update README with new CLI arguments
  • Update .env.example with new Env variables

Reference

// Retry Policy
const initialInterval = 1000; // 1s
const maximumAttempts = options.maximumAttempts ?? 100 * initialInterval;
const maximumInterval = 100 * initialInterval;
const backoffCoefficient = 2;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.