Giter Site home page Giter Site logo

andywer / pg-listen Goto Github PK

View Code? Open in Web Editor NEW
562.0 12.0 33.0 387 KB

šŸ“” PostgreSQL LISTEN & NOTIFY for node.js that finally works.

License: MIT License

TypeScript 98.17% JavaScript 1.83%
postgresql nodejs typescript pubsub notifications message-passing events

pg-listen's People

Contributors

andywer avatar benatkin avatar bergundy avatar dependabot[bot] avatar kesla avatar maxpain avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pg-listen's Issues

pg Dependency missing

While setting up an empty project with pg-listen to do some testing I ran into an issue.

After installing pg listen via npm i pg-listen and starting a script with import createSubscriber from "pg-listen", node throws the folloing error:

internal/modules/cjs/loader.js:889
  const err = new Error(message); 
              ^

Error: Cannot find module 'pg'
Require stack:
- E:\Coding\pg-listener\node_modules\pg-listen\dist\index.js
- E:\Coding\pg-listener\node_modules\pg-listen\index.js
    at Function.Module._resolveFilename (internal/modules/cjs/loader.js:889:15)
    at Function.Module._load (internal/modules/cjs/loader.js:745:27)
    at Module.require (internal/modules/cjs/loader.js:961:19)
    at require (internal/modules/cjs/helpers.js:92:18)
    at Object.<anonymous> (E:\Coding\pg-listener\node_modules\pg-listen\dist\index.js:56:10)
    at Module._compile (internal/modules/cjs/loader.js:1072:14)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:1101:10)
    at Module.load (internal/modules/cjs/loader.js:937:32)
    at Function.Module._load (internal/modules/cjs/loader.js:778:12)
    at Module.require (internal/modules/cjs/loader.js:961:19) {
  code: 'MODULE_NOT_FOUND',
  requireStack: [
    'E:\\Coding\\pg-listener\\node_modules\\pg-listen\\dist\\index.js',
    'E:\\Coding\\pg-listener\\node_modules\\pg-listen\\index.js'
  ]
}

After installing the package "pg" manually, the script started successfully.

Having a look into the package.json of pg-listen, "pg" only appears as peerDependency and devDependency.

Payload must (?) be valid JSON

Hi Andy,

Thanks a lot for your module and the work you've invested in it.

Are there any plans to support notification payloads that are not valid JSON? I was hoping to use it for passing an ID (UUID), but it barfs with:

  pg-listen:notification Received PostgreSQL notification on "b2bc-async": 9a87bf09-9e57-452c-869f-010481781f5b +0ms
Fatal database connection error: SyntaxError: Error parsing PostgreSQL notification payload: Unexpected token a in JSON at position 1
    at JSON.parse (<anonymous>)
    at Client.onNotification (/Users/stbaldwin/dev/node/node_modules/pg-listen/dist/index.js:116:51)
    at Client.emit (events.js:189:13)
    at Connection.<anonymous> (/Users/stbaldwin/dev/node/node_modules/pg/lib/client.js:315:10)
    at Connection.emit (events.js:189:13)
    at Socket.<anonymous> (/Users/stbaldwin/dev/node/node_modules/pg/lib/connection.js:125:12)
    at Socket.emit (events.js:189:13)
    at addChunk (_stream_readable.js:284:12)
    at readableAddChunk (_stream_readable.js:265:11)
    at Socket.Readable.push (_stream_readable.js:220:10)
  pg-listen:connection Closing PostgreSQL notification listener. +1m

I can possibly modify the notifying code to create a JSON payload, but it seems like an 'unnecessary' overhead, and it should be up to the consumer to decide what to do with the payload.

Cheers,

Steve

Unhandled promise rejection during reconnect

I'm getting unnecessary console spam from unsuccessful reconnect attempts that seem to be caused by the rejection on this line.

To cause temporary interruptions I used the following SQL. For the grant/revoke to work, the user which connection is being interrupted must not be a superuser. This is probably not the only way to cause an interruption, it could be also at the network level or by temporarily shutting down the database. Setting retryTimeout to a null value is useful as well.

-- Revoke new connections for non-superusers
REVOKE CONNECT ON DATABASE mydatabase FROM public;
-- Terminate currently active connections for a specific non-superuser 'myuser'
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename = 'myuser';
-- Some time later restore ability to reconnect
GRANT CONNECT ON DATABASE mydatabase TO public;

I couldn't really understand in which state end is called in relation to the try/catch block where this code is placed, so I am unsure what the supposed fix should be.

Here's what I'm getting in console:

(node:14040) UnhandledPromiseRejectionWarning: Error: Connection ended.
    at Client.<anonymous> (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg-listen\dist\index.js:89:93)
    at Object.onceWrapper (events.js:286:20)
    at Client.emit (events.js:198:13)
    at process.nextTick (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg\lib\client.js:256:12)
    at process._tickCallback (internal/process/next_tick.js:61:11)
(node:14040) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:14040) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
(node:14040) UnhandledPromiseRejectionWarning: Error: Connection ended.
    at Client.<anonymous> (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg-listen\dist\index.js:89:93)
    at Object.onceWrapper (events.js:286:20)
    at Client.emit (events.js:198:13)
    at process.nextTick (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg\lib\client.js:256:12)
    at process._tickCallback (internal/process/next_tick.js:61:11)
(node:14040) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)
(node:14040) UnhandledPromiseRejectionWarning: Error: Connection ended.
    at Client.<anonymous> (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg-listen\dist\index.js:89:93)
    at Object.onceWrapper (events.js:286:20)
    at Client.emit (events.js:198:13)
    at process.nextTick (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg\lib\client.js:256:12)
    at process._tickCallback (internal/process/next_tick.js:61:11)
(node:14040) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 3)
(node:14040) UnhandledPromiseRejectionWarning: Error: Connection ended.
    at Client.<anonymous> (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg-listen\dist\index.js:89:93)
    at Object.onceWrapper (events.js:286:20)
    at Client.emit (events.js:198:13)
    at process.nextTick (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg\lib\client.js:256:12)
    at process._tickCallback (internal/process/next_tick.js:61:11)
(node:14040) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 4)
(node:14040) UnhandledPromiseRejectionWarning: Error: Connection ended.
    at Client.<anonymous> (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg-listen\dist\index.js:89:93)
    at Object.onceWrapper (events.js:286:20)
    at Client.emit (events.js:198:13)
    at process.nextTick (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg\lib\client.js:256:12)
    at process._tickCallback (internal/process/next_tick.js:61:11)
(node:14040) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 5)
(node:14040) UnhandledPromiseRejectionWarning: Error: Connection ended.
    at Client.<anonymous> (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg-listen\dist\index.js:89:93)
    at Object.onceWrapper (events.js:286:20)
    at Client.emit (events.js:198:13)
    at process.nextTick (C:\Users\Indrek\Desktop\node-indexer\node_modules\pg\lib\client.js:256:12)
    at process._tickCallback (internal/process/next_tick.js:61:11)
...repeat...

division by zero error

Periodically having this thrown:

error: division by zero
    at Parser.parseErrorMessage (/var/app/current/node_modules/pg-protocol/src/parser.ts:357:11)
    at Parser.handlePacket (/var/app/current/node_modules/pg-protocol/src/parser.ts:186:21)
    at Parser.parse (/var/app/current/node_modules/pg-protocol/src/parser.ts:101:30)
    at Socket.<anonymous> (/var/app/current/node_modules/pg-protocol/src/index.ts:7:48)
    at /var/app/current/node_modules/dd-trace/packages/dd-trace/src/scope/base.js:54:19
    at Scope._activate (/var/app/current/node_modules/dd-trace/packages/dd-trace/src/scope/async_hooks.js:53:14)
    at Scope.activate (/var/app/current/node_modules/dd-trace/packages/dd-trace/src/scope/base.js:12:19)
    at Socket.bound (/var/app/current/node_modules/dd-trace/packages/dd-trace/src/scope/base.js:53:20)
    at Socket.emit (events.js:315:20)
    at Socket.EventEmitter.emit (domain.js:482:12)
    at addChunk (_stream_readable.js:295:12)
    at readableAddChunk (_stream_readable.js:271:9)
    at Socket.Readable.push (_stream_readable.js:212:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:186:23)

I can work on a minimal repro if its helpful.

Strangely enough, I am not even connecting to the database, or starting to listen on any channels. The only thing I need to do to produce this error is import the module.

Reconnecting hangs when connection is terminated

While attempting to produce a connection error, I noticed that pg-listen seems to never recover from termination:

SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename = 'myusername';

Log:

  pg-listen:connection Creating PostgreSQL client for notification streaming +0ms
  pg-listen:subscription Subscribing to PostgreSQL notification "test-chan" +0ms
  pg-listen:connection DB Client error: { error: terminating connection due to administrator command
    at Connection.parseE (/Users/indrek/test/node_modules/pg/lib/connection.js:601:11)
    at Connection.parseMessage (/Users/indrek/test/node_modules/pg/lib/connection.js:398:19)
    at Socket.<anonymous> (/Users/indrek/test/node_modules/pg/lib/connection.js:120:22)
    at Socket.emit (events.js:189:13)
    at addChunk (_stream_readable.js:284:12)
    at readableAddChunk (_stream_readable.js:265:11)
    at Socket.Readable.push (_stream_readable.js:220:10)
    at TCP.onStreamRead [as onread] (internal/stream_base_commons.js:94:17)
  name: 'error',
  length: 109,
  severity: 'FATAL',
  code: '57P01',
  detail: undefined,
  hint: undefined,
  position: undefined,
  internalPosition: undefined,
  internalQuery: undefined,
  where: undefined,
  schema: undefined,
  table: undefined,
  column: undefined,
  dataType: undefined,
  constraint: undefined,
  file: 'postgres.c',
  line: '2881',
  routine: 'ProcessInterrupts' } +6s
  pg-listen:connection Reconnecting to PostgreSQL for notification streaming +18ms
  pg-listen:connection PostgreSQL reconnection attempt #1... +0ms
  pg-listen:connection DB Client error: Error: Connection terminated unexpectedly
    at Connection.con.once (/Users/indrek/test/node_modules/pg/lib/client.js:223:9)
    at Object.onceWrapper (events.js:277:13)
    at Connection.emit (events.js:189:13)
    at Socket.<anonymous> (/Users/indrek/test/node_modules/pg/lib/connection.js:130:10)
    at Socket.emit (events.js:194:15)
    at endReadableNT (_stream_readable.js:1125:12)
    at process._tickCallback (internal/process/next_tick.js:63:19) +131ms
  pg-listen:connection Reconnecting to PostgreSQL for notification streaming +0ms
  pg-listen:connection PostgreSQL reconnection attempt #1... +1ms
  pg-listen:connection DB Client connection ended +1ms
  pg-listen:connection Reconnecting to PostgreSQL for notification streaming +0ms
  pg-listen:connection PostgreSQL reconnection attempt #1... +0ms

connect function is returning undefined

pg-listen/src/index.ts

Lines 302 to 306 in 7dec032

async connect () {
initialize(dbClient)
await dbClient.connect()
emitter.emit("connected")
},

Is there any reason why connect() is returning undefined not dbClient? Just wondering because I need to query db also and it doesn't make sense to create another connection to db. When connect() function would return dbClient it would be possible.

Socket is closed

Hi,

I am trying to run a node based on your Usage instructions but I get a socket is closed error.
Not sure I am doing it right as it looks like the process on exit handler is called prematurely.

Here is my code:

import createSubcriber from "pg-listen"
import { databaseURL } from "./config"

// Accepts the same connection config object that the "pg" package would take
const subscriber = createSubcriber({ connectionString: databaseURL });

subscriber.notifications.on("my-channel", (payload) => {
  // Payload as passed to subscriber.notify() (see below)
  console.log("Received notification in 'my-channel':", payload)
})

subscriber.events.on("error", (error) => {
  console.error("Fatal database connection error:", error)
  process.exit(1)
})

process.on("exit", () => {
  console.log('Process on exit has been called');
  subscriber.close()
})

export async function connect () {
  console.log('connect called')
  await subscriber.connect()
  await subscriber.listenTo("my-channel")
}

export async function sendSampleMessage () {
  console.log('sending a notify message to postgres');
  await subscriber.notify({
    greeting: "Hey, buddy.",
    timestamp: Date.now()
  })
}

I am running export DEBUG=pg-listen:*; node dist/index.js
Console output is:

pg-listen:connection Creating PostgreSQL client for notification streaming +0ms
Process on exit has been called
  pg-listen:connection Closing PostgreSQL notification listener. +6ms
events.js:173
      throw er; // Unhandled 'error' event
      ^

Error [ERR_SOCKET_CLOSED]: Socket is closed
    at Socket._writeGeneric (net.js:711:18)
    at Socket._write (net.js:733:8)
    at doWrite (_stream_writable.js:415:12)
    at writeOrBuffer (_stream_writable.js:399:5)
    at Socket.Writable.write (_stream_writable.js:299:11)
    at Connection.end (/Users/ra/code/popmetrics/pig/database-notifier/node_modules/pg/lib/connection.js:320:22)
    at Client.end (/Users/ra/code/popmetrics/pig/database-notifier/node_modules/pg/lib/client.js:489:21)
    at Object.close (/Users/ra/code/popmetrics/pig/database-notifier/node_modules/pg-listen/dist/index.js:240:29)
    at process.<anonymous> (/Users/ra/code/popmetrics/pig/database-notifier/dist/index.js:33:14)
    at process.emit (events.js:197:13)
Emitted 'error' event at:
    at errorOrDestroy (internal/streams/destroy.js:98:12)
    at onwriteError (_stream_writable.js:430:5)
    at onwrite (_stream_writable.js:461:5)
    at _destroy (internal/streams/destroy.js:40:7)
    at Socket._destroy (net.js:618:3)
    at Socket.destroy (internal/streams/destroy.js:32:8)
    at Socket._writeGeneric (net.js:711:10)
    at Socket._write (net.js:733:8)
    at doWrite (_stream_writable.js:415:12)
    at writeOrBuffer (_stream_writable.js:399:5)

The database connection is fine and tested.

Cannot find module 'pg-native'

Getting this warning message when importing the package with commonjs

const createPostgresSubscriber = require('pg-listen')

Getting console message: Cannot find module 'pg-native'

Everything seems to be working though. Maybe some problem in dependencies?

Using version [email protected]

Reconnect with infinite retry attempts/timeout does not respond to `close()`

I'm implementing a worker process with the following requirements:

  • The notification listener should retry reconnecting indefinitely. (Infinite retry attempts, no timeout)
  • The worker process should respond to SIGINT signals, and gracefully terminate.

The "reconnect" loop does not respond to any signal from calling close(), so it will attempt to reconnect indefinitely. The promise hangs around and prevents the process from closing normally. (I've even seen odd behaviour where I've killed the process using kill -15 1234, and it returns to the terminal prompt, but I still see "reconnecting" messages logged. šŸ˜³)

A rough sketch:

import createPostgresSubscriber from "pg-listen";

async function demo() {
  try {
    process.on("SIGTERM", () => {
      console.log("Attempting graceful termination");
      await subscriber.close();
      console.log("Subscriber closed");
    });

    var subscriber = createPostgresSubscriber(
      {
        connectionString:
          "postgresql://invalid_user:invalid_password@localhost/postgres"
      },
      {
        retryInterval: 500,
        retryTimeout: Number.POSITIVE_INFINITY
      }
    );

    this.subscriber.events.on("reconnect", () => {
      console.log(`Reconnecting...`);
    });

    await subscriber.connect();
    await subscriber.listenTo("some_channel");
  } catch (err) {
    console.error(err);
  }
}

demo().then(
  () => console.log("Done"),
  err => console.error("Error", err)
);

Test setup improvement and more tests

This module has very little tests or no tests at all for some scenarios. There is need for movement from "production tested" to improved test system.
I think that this module needs following tests

Module promises:

  • Module promises reconnection and Error handling. no tests
  • Tests for continuous health checks. no tests
  • Proper error handling very little tests

Other tests could be:

  • "Stress testing". Trigger hundreds or thousands events in Postgres db and handle events
  • Test for module imports (commonjs, es6 import)
  • listen many channels with single subscriber

Does anyone have a nodejs example?

Hi Guys,

just trying to get a really simple nodejs app running that just listens for notifications, prints the payloads and sits waiting for the next one.

I'm buggered if I can work out how to make this happen with pg-listen, it just drops straight through, obviously my lack of knowledge, but can anyone help out?

Ta.

Default export could be improved

Accessing "default" property is needed when coder imports module with require

const createPostgresSubscriber = require('pg-listen').default

This feels weird

Where to pass options

Hello,
I need to change the retryTimeout but I can't figure out where to pass the options parameter. For the subscriber interface the connect function doesn't take the options parameters so I am not sure how to pass them.

Thanks!

Incorrect processing ofĀ notifications without payload

Hi!

If an app receives anĀ empty notification, done like this (without payload)

NOTIFY "channel";

Then, it crashes with:

Fatal database connection error: SyntaxError: Error parsing PostgreSQL notification payload: Unexpected end of JSON input

I think, theĀ problem is here:

payload = notification.payload !== undefined ? parse(notification.payload) : undefined

As notification.payload is anĀ empty string inĀ this case.

Can be fixed with this orĀ similiar check:

payload = notification.payload !== undefined || notification.payload !== '' ? parse(notification.payload) : undefined

Integration with node-postgres

I want to pass to options not a connection string, I want to pass node-pg instance. May you can do update with parsing connection string from node-pg module instance?

error event handler is never removed

When reinitializing, an 'error' event handler is added to the old client, which is not later removed:

dbClient.once("error", error => connectionLogger(`Previous DB client errored after reconnecting already:`, error))

Perhaps an idea to wait for the client to end and then remove the listeners again?

await dbClient.end();
dbClient.removeAllListeners();

Channel with hyphen does not work

I tried using psql and listen the channel example 'my-channel';

# listen my-channel;
ERROR:  syntax error at or near "-"
LINE 1: listen my-channel;

subscriber.notify() example is missing the channel

The call to subscriber.notify() in the README's example is missing the channel argument.
To go with the example provided, it should rather look like this:

export async function sendSampleMessage () {
  await subscriber.notify("my-channel", {
    greeting: "Hey, buddy.",
    timestamp: Date.now()
  })
}

Make Subscriber interface generic

Make Subsrciber interface generic such that I can define custom notification events + payload types like so:

interface NotificationEvents {
  'inserted-row': {
    tableName: string;
    row: Record;
  };
}

export default class PgListenerService extends BaseService {
  subscriber: Subscriber<NotificationEvents>;

  constructor() {
    super();
    this.subscriber = createSubscriber({ connectionString: envs.pgDbUrl });
    process.on('exit', () => {
      this.subscriber.close();
    });
  }

  async init() {
    this.subscriber.notifications.on('inserted-row', payload => {
      payload.tableName; // string instead of any
    });
    await this.subscriber.connect();
    await this.subscriber.listenTo('inserted-row');
  }
}

Add a "connected" event.

It's often that you need to perform some work to get back into a consistent state after losing your connection or after the initial connection is established (because you don't know what happened with the DB in the meantime).

The reconnect kinda works for this, but it's emitted before the connection comes back.

It would be nice to have a general connect[ed]/reconnected event for this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    šŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. šŸ“ŠšŸ“ˆšŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ā¤ļø Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.