Giter Site home page Giter Site logo

nestjs-nats-streaming-transport's Introduction

Nats Streaming Transport Module for NestJS

Build Event Driven Microservices Architecture with Nats Streaming Server and NestJS.

  • Log based persistence
  • At-Least-Once Delivery model, giving reliable message delivery
  • Rate matched on a per subscription basis
  • Replay/Restart
  • Last Value Semantics

Exposes the node-nats-streaming library through @ctx context object.

Install

npm i @nestjs/microservices
npm i @nestjs-plugins/nestjs-nats-streaming-transport

Running nats-streaming-server in docker

docker run -p 4222:4222 -p 8222:8222 nats-streaming -m 8222 -cid 'my-cluster' -SD

TransportConnectOptions

  • ackTimeout (number: default: 30000) - Timeout for the server to receive acknowledgement messages from the client in milliseconds.
  • connectTimeout (number, default: 2000 ) - Timeout for the client to receive request responses from the nats-streaming-server in milliseconds.
  • discoverPrefix (string, defeult: _STAN.discover) - Subject prefix used to discover nats-streaming-servers (must match server).
  • encoding (string, default: utf8) - Encoding used by stan to decode strings.
  • maxPingOut (number, default: 3) - Maximum number of missing pongs from the nats-streaming-server before the connection is lost and closed.
  • maxPubAcksInflight (number, default: 16384) - Maximum number of messages a publisher may have in flight without acknowledgment.
  • maxReconnectAttempts (number, default: -1) - Maximum number of reconnect attempts (infinite = -1)
  • name (string, default:) - Optional client name
  • nc - (Stan, default: ) - nats client
  • nkey - (string, default:) - nkeys authentication
  • noRandomize (boolean: default: false) - If set, the order of user-specified servers is randomized.
  • nonceSigner (Function, default:) - A function that takes a Buffer and returns a nkey signed signature.
  • pass (string, default:) - Sets the password for a connection
  • pedantic (boolean, default false) - Turns on strict subject format checks
  • pingInterval (number, default: 120000) - Number of milliseconds between client-sent pings
  • reconnect (boolean, false) - If false server will not attempt reconnecting
  • reconnectTimeWait (number, default: 2000)- If disconnected, the client will wait the specified number of milliseconds between reconnect attempts.
  • servers (string[], default:) - Array of connection urls.
  • stanEncoding () -
  • stanMaxPingOut () -
  • stanPingInterval (number, default: 5000) - Client ping interval to the nats-streaming-server in milliseconds.
  • tls (boolean, default: false) - This property can be a boolean or an Object. If true the client requires a TLS connection. If false a non-tls connection is required. The value can also be an object specifying TLS certificate data. The properties ca, key, cert should contain the certificate file data. ca should be provided for self-signed certificates. key and cert are required for client provided certificates. rejectUnauthorized if true validates server's credentials
  • token (string, default:) - Sets a authorization token for a connection
  • tokenHandler (Function, default:) - A function returning a token used for authentication.
  • url (string, default: nats://localhost:4222) - Connection url
  • useOldRequestStyle (boolean: false) - If set to true calls to request() and requestOne() will create an inbox subscription per call.
  • user (string, default:) - Sets the username for a connection
  • userCreds (string, default: '') - Set with NATS.creds()
  • userJWT (string, default: '') - The property can be a JWT or a function that returns a JWT.
  • verbose (boolean, default: false) - Turns on +OK protocol acknowledgements
  • waitOnFirstConnect (boolean, default: false) - If true the server will fall back to a reconnect mode if it fails its first connection attempt.
  • yieldTime (number, default:) - If set, processing will yield at least the specified number of milliseconds to IO callbacks before processing inbound messages

TransportSubscriptionOptions

  • deliverAllAvailable (string, default: false) - Receive all stored values in order
  • durableName (string, default: "") - Track the last acknowledged message for that clientID + durable name. Only messages since the last acknowledged message will be delivered to the client.
  • manualAckMode (boolean, default: false) - Manual acknowledgement mode on the subscription. Default is to automatically acknowledge messsages.
  • maxInFligth (number, default: 0) - Specifies the maximum number of outstanding acknowledgements.
  • startAt (Nats.StartPosition, default null) - Subscribe starting at a specific time
  • startAtSequence (number, default: 0) - Receive all messages starting at a specific sequence number
  • startAtTimeDelta (number: default:0) - Subscribe starting at a specific amount of time in the past (e.g. 30 seconds ago)
  • startTime (Date, default: null) - Subscribe starting at a specific time
  • startWithLastReceived (boolean, default: false) - Subscribe starting with the most recently published value

Read more about nats-streaming-server

Read more about node-nats-streaming

nestjs-nats-streaming-transport - code examples:

A simple Event interface used in this example

  // @app/events;
  
  export interface UserCreatedEvent {
    id: number,
    username: string
}

A simple enum to describe pattern used as subjects.

// '@app/events

export enum Patterns {
  UserCreated = 'user:created'  
}

Setup event Publisher

// app.module.ts
 
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { NatsStreamingTransport } from '@nestjs-plugins/nestjs-nats-streaming-transport'

@Module({
  imports: [
     NatsStreamingTransport.register(
       {
        clientId: 'user-service-publisher',
        clusterId: 'my-cluster',
        connectOptions: {
          url: 'http://127.0.0.1:4222',
        },
      }
     ),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Publish an Event

// app.service.ts

import { Injectable } from '@nestjs/common';
import { Publisher } from '@nestjs-plugins/nestjs-nats-streaming-transport';
import { UserCreatedEvent, Patterns } from '@app/events'

@Injectable()
export class AppService {

  constructor(private publisher: Publisher) { }

  getHello(): string {
    const event: UserCreatedEvent = { id: Math.floor(Math.random() * Math.floor(1000)), username: 'bernt' }
    this.publisher.emit<string, UserCreatedEvent>(Patterns.UserCreated, event).subscribe(guid => {
      console.log('published message with guid:', guid)
    })
    return `published message: ${JSON.stringify(event)}`
  }
}

Setup Event Listener

// main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Listener } from '@nestjs-plugins/nestjs-nats-streaming-transport'
import { CustomStrategy } from '@nestjs/microservices';
async function bootstrap() {

  const options: CustomStrategy = {
    strategy: new Listener(
      'my-cluster' /* clusterID */,
      'user-service-listener' /* clientID */,
      'user-service-group', /* queueGroupName */
      {
        url: 'http://127.0.0.1:4222'
      } /* TransportConnectOptions */,
      {
        durableName: 'user-queue-group',
        manualAckMode: true,
        deliverAllAvailable: true,
      } /* TransportSubscriptionOptions */ ,
    ),
  };
 
  // hybrid microservice and web application
  const app = await NestFactory.create(AppModule);
  const microService = app.connectMicroservice(options)
  microService.listen(() => app.listen(3000))
}
bootstrap();

Subscribe Handler

// app.controller.ts

import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { NatsStreamingContext } from '@nestjs-plugins/nestjs-nats-streaming-transport';
import { Patterns } from '@app/events';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) { }

  @Get()
  getHello(): string {
    return this.appService.getHello();
  }

  @EventPattern(Patterns.UserCreated)
  public async stationCreatedHandler(@Payload() data: { id: number, name: string }, @Ctx() context: NatsStreamingContext) {
    console.log(`received message: ${JSON.stringify(data)}`)
    context.message.ack()
  }
}

nestjs-nats-streaming-transport's People

Contributors

kenguru33 avatar dependabot[bot] avatar

Watchers

James Cloos avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.