Giter Site home page Giter Site logo

datasource-proxy-r2dbc's Introduction

datasource-proxy-r2dbc

Maven Central

datasource-proxy for R2DBC.

About

Provide listeners that receive callbacks of query executions and method invocations on R2DBC SPI.

Callbacks are:

  • Before/After query executions when Batch#execute() or Statement#execute() is called.
  • Before/After any method calls on ConnectionFactory, Connection, Batch and Statement.
  • (Each query result emitted by Publisher<? extends Result>.) (TBD)

Here is sample use cases for listeners:

  • Query logging
  • Slow query detection
  • Tracing
  • Metrics
  • Assertion/Verification
    • Connection leak detection
    • Transaction check
  • Custom logic injection
  • etc.

Use cases

Query logging

When query is executed by Batch#execute() or Statement#execute(), listener receives query callbacks. The callback contains query execution information(QueryExecutionInfo) such as query string, execution type, bindings, execution time, etc.
You could output/log the information.

Sample Output (wrapped for display purpose):

# Statement with no bindings
# 
Thread:reactor-tcp-nio-1(30) Connection:1
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:34
Type:Statement BatchSize:0 BindingsSize:0
Query:["SELECT value FROM test"], Bindings:[]

# Batch query
#
Thread:reactor-tcp-nio-3(32) Connection:2
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:4
Type:Batch BatchSize:2 BindingsSize:0
Query:["INSERT INTO test VALUES(200)","SELECT value FROM test"], Bindings:[]

# Statement with multiple bindings
#
Thread:reactor-tcp-nio-1(30) Connection:3
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:21
Type:Statement BatchSize:0 BindingsSize:2
Query:["INSERT INTO test VALUES ($1,$2)"], Bindings:[(100,101),(200,null(int))]

Slow query detection

There are two types of slow query detection.

  • Detect slow query AFTER query has executed.
  • Detect slow query WHILE query is running.

Former is simple. On afterQuery callback, check the execution time. If it took more than threashold, perform an action such as logging, send notification, etc.

To perform some action while query is still executing and passed threashold time, one implementation is to create a watcher that checks running queries and notify ones exceeded the threshold.
It is currently in plan to port SlowQueryListener from datasource-proxy.

Method tracing

When any methods on ConnectionFactory, Connection, Batch, or Statement are called, listeners receive callbacks on before and after invocations.

Below output simply printed out the method execution information(MethodExecutionInfo) at each method invocation.
Essentially, this shows interaction with R2DBC SPI.

You could even call distributed tracing system to create span for the actions such as connection open/close.

Sample: Execution with transaction (see sample):

  1: Thread:34 Connection:1 Time:16  PostgresqlConnectionFactory#create()
  2: Thread:34 Connection:1 Time:0  PostgresqlConnection#createStatement()
  3: Thread:34 Connection:1 Time:0  ExtendedQueryPostgresqlStatement#bind()
  4: Thread:34 Connection:1 Time:0  ExtendedQueryPostgresqlStatement#add()
  5: Thread:34 Connection:1 Time:5  PostgresqlConnection#beginTransaction()
  6: Thread:34 Connection:1 Time:5  ExtendedQueryPostgresqlStatement#execute()
  7: Thread:34 Connection:1 Time:3  PostgresqlConnection#commitTransaction()
  8: Thread:34 Connection:1 Time:4  PostgresqlConnection#close()

Distributed Tracing

Sample project: Tracing with sleuth

Tracing Tracing

Connection Span Connection Span

Query Span Query Span

Metrics

On every callback, any obtained information can update metrics.

For example, on method execution, number of opened connections, number of rollbacks, method execution time, etc; for query execution, number of queries, type of query (SELECT, DELETE, ...), execution time, etc. can be used for metrics.

Assertion/Verification

By inspecting invoked methods and/or executed queries, you can verify your logic has performed as expected.

For example, by keeping track of connection open/close method calls, connection leaks can be detected or verified.

Another example is to check group of the target queries are executed on the same connection. This could verify the premise of transaction that queries need to be performed on the same connection in order to be in the same transaction.

Custom logic injection

Any logic can be performed on callbacks. Thus, you can write own logic that performs anything, such as audit logging, sending notifications, calling external system, etc.

Samples

Sample projects: datasource-proxy-r2dbc-samples


API

Listener API

ProxyExecutionListener is the listener interface. This defines callbacks for method invocation, query execution, and query result processing.

// invoked before any method on proxy is called
void beforeMethod(MethodExecutionInfo executionInfo);

// invoked after any method on proxy is called
void afterMethod(MethodExecutionInfo executionInfo);

// invoked before query gets executed
void beforeQuery(QueryExecutionInfo execInfo);

// invoked after query is executed
void afterQuery(QueryExecutionInfo execInfo);

// invoked on processing(subscribing) each query result
void eachQueryResult(QueryExecutionInfo execInfo);

MethodExecutionInfo and QueryExecutionInfo contains contextual information about the method/query execution.

Any method calls on proxied ConnectionFactory, Connection, Batch, and Statement triggers method callbacks - beforeMethod() and afterMethod().
Batch#execute() and Statement#execute() triggers query callbacks - beforeQuery() and afterQuery().(Specifically when returned result publisher is subscribed.)
eachQueryResult() is called on each query result while being subscribed.

LifeCycleListener

LifeCycleListener provides before/after methods for all methods defined on ConnectionFactory, Connection, Batch, and Statement, as well as query executions. This listener is built on top of method and query interceptor API on ProxyExecutionListener.

For example, if you want know the creation of connection and close of it:

public class ConnectionStartToEndListener implements LifeCycleListener {

  @Override
  public void beforeCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) {
    // callback at ConnectionFactory#create()
  }

  @Override
  public void afterCloseOnConnection(MethodExecutionInfo methodExecutionInfo) {
    // callback at Connection#close()
  }

  }

QueryExecutionInfoFormatter

This class converts QueryExecutionInfo to String. Mainly used for preparing log entries.
Internally, this class has multiple consumers for QueryExecutionInfo and loop through them to populate the output StringBuilder.

This class implements Function<QueryExecutionInfo,String> and can be used in functional style as well.

// convert all info
QueryExecutionInfoFormatter formatter = QueryExecutionInfoFormatter#showAll();
String str = formatter.format(queryExecutionInfo);

// customize conversion
QueryExecutionInfoFormatter formatter = new QueryExecutionInfoFormatter();
formatter.addConsumer((execInfo, sb) -> {
  sb.append("MY-QUERY-EXECUTION="); // add prefix
};
formatter.newLine();  // new line
formatter.showSuccess();
formatter.showConnection((execInfo, sb)  -> {
    // custom conversion
    sb.append("MY-ID=" + executionInfo.getConnectionInfo().getConnectionId());
});
formatter.showQuery();

// convert it
String str = formatter.format(queryExecutionInfo);

MethodExecutionInfoFormatter

Similar to QueryExecutionInfoFormatter, MethodExecutionInfoFormatter converts MethodExecutionInfo to String.

MethodExecutionInfoFormatter formatter = MethodExecutionInfoFormatter.withDefault();

ProxyConnectionFactoryBuilder.create(connectionFactory)
  .onAfterMethod(execInfo ->
     execInfo.map(methodExecutionFormatter::format)  // convert
       .doOnNext(System.out::println)  // print out to sysout
       .subscribe())
  .build();

Setup

Use ProxyConnectionFactoryBuilder to create a proxied ConnectionFactory and pass it to R2DBC client.

// original connection factory
ConnectionFactory connectionFactory = new PostgresqlConnectionFactory(configuration);

// create proxied connection factory
ConnectionFactory proxyConnectionFactory =
  ProxyConnectionFactoryBuilder.create(connectionFactory)  // pass original ConnectionFactory
    .onAfterMethod(mono -> {
      ...   // callback after method execution
    })
    .onEachQueryResult(mono -> {
      ...   // callback for each result 
    })
    .onAfterQuery(mono -> {
      ...  //  callback after query execution
    })
    .build();

// initialize client with the wrappd connection factory
R2dbc client = new R2dbc(proxyConnectionFactory);

Install

  • local maven install

    ./mvnw install
  • jitpack

Versions

datasource-proxy-r2dbc is developed on following versions.

datasource-proxy-r2dbc r2dbc-spi reactor-core
0.2-SNAPSHOT 1.0.0.M6 Californium-SR2
0.1 1.0.0.M6 Californium-SR2

NOTE:
Currently, it is built on milestone release of r2dbc-spi.
To get milestone releases, spring-milestones repo needs to be added.

<repositories>
  <repository>
    <id>spring-milestones</id>
    <name>Spring Milestones</name>
    <url>https://repo.spring.io/milestone</url>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
</repositories>

Maven and Gradle

Maven

<dependency>
  <groupId>net.ttddyy</groupId>
  <artifactId>datasource-proxy-r2dbc</artifactId>
  <version>${latest-version}</version>
</dependency>

Gradle

compile "net.ttddyy:datasource-proxy-r2dbc:${latest-version}"

NOTE: artifactId may change in future.


Sample Configuration

Query logging

On after query callback, write out executed query information. This can be done in before query execution; however, some of the attributes are only available at after query execution such as execution time, successfully executed, etc.

QueryExecutionInfoFormatter, which converts QueryExecutionInfo to String, can be used out of the box to generate log statements.

QueryExecutionInfoFormatter queryExecutionFormatter = QueryExecutionInfoFormatter.showAll();

ConnectionFactory proxyConnectionFactory =
  ProxyConnectionFactoryBuilder.create(connectionFactory)  // wrap original ConnectionFactory
    // on every query execution
    .onAfterQuery(execInfo ->
      execInfo.map(queryExecutionFormatter::format)    // convert QueryExecutionInfo to String
              .doOnNext(System.out::println)       // print out executed query
              .subscribe())
    .build();

Slow query detection

Detect slow query AFTER query has executed

On after query execution, check whether the query execution time has exceeded the threashold time, then perform any action.

Duration threashold = Duration.of(...);

ConnectionFactory proxyConnectionFactory =
  ProxyConnectionFactoryBuilder.create(connectionFactory)
    .onAfterQuery(mono -> mono
       .filter(execInfo -> threashold.minus(execInfo.getExecuteDuration()).isNegative())
       .doOnNext(execInfo -> {
         // slow query logic
       })
       .subscribe())
    .build();

Detect slow query WHILE query is executing

TBD for slow query detection while query is executing.

Method tracing

At each invocation of methods, perform action such as printing out the invoked method, create a span, or update metrics.

MethodExecutionInfoFormatter is used to generate log string.

MethodExecutionInfoFormatter methodExecutionFormatter = MethodExecutionInfoFormatter.withDefault();

ConnectionFactory proxyConnectionFactory =
  ProxyConnectionFactoryBuilder.create(connectionFactory)  // wrap original ConnectionFactory
    // on every method invocation
    .onAfterMethod(execInfo ->  
      execInfo.map(methodExecutionFormatter::format)    // convert MethodExecutionInfo to String
              .doOnNext(System.out::println)        // print out method execution (method tracing)
              .subscribe())
    .build();

Sample

Client code:

// Simple Transaction Example
getR2dbc()
  .withHandle(handle -> handle
    .inTransaction(h -> h.execute("INSERT INTO test VALUES ($1)", 200)))
  .subscribe();

Setup:

// converter: from execution info to String
QueryExecutionInfoFormatter queryExecutionFormatter = QueryExecutionInfoFormatter.showAll();
MethodExecutionInfoFormatter methodExecutionFormatter = MethodExecutionInfoFormatter.withDefault();

ConnectionFactory proxyConnectionFactory =
  ProxyConnectionFactoryBuilder.create(connectionFactory)  // wrap original ConnectionFactory
    // on every method invocation
    .onAfterMethod(execInfo ->  
      execInfo.map(methodExecutionFormatter::format)
              .doOnNext(System.out::println)        // print out method execution (method tracing)
              .subscribe())
    // on every query execution
    .onAfterQuery(execInfo ->
      execInfo.map(queryExecutionFormatter::format)
              .doOnNext(System.out::println)       // print out executed query
              .subscribe())
    .build();

// pass the proxied ConnectionFactory to client
this.r2dbc = new R2dbc(proxyConnectionFactory);

Method tracing output:

  1: Thread:34 Connection:1 Time:16  PostgresqlConnectionFactory#create()
  2: Thread:34 Connection:1 Time:0  PostgresqlConnection#createStatement()
  3: Thread:34 Connection:1 Time:0  ExtendedQueryPostgresqlStatement#bind()
  4: Thread:34 Connection:1 Time:0  ExtendedQueryPostgresqlStatement#add()
  5: Thread:34 Connection:1 Time:5  PostgresqlConnection#beginTransaction()
  6: Thread:34 Connection:1 Time:5  ExtendedQueryPostgresqlStatement#execute()
  7: Thread:34 Connection:1 Time:3  PostgresqlConnection#commitTransaction()
  8: Thread:34 Connection:1 Time:4  PostgresqlConnection#close()

Query output: (wrapped for display)

Thread:reactor-tcp-nio-1(30) Connection:1
Transaction:{Create:1 Rollback:0 Commit:0}
Success:True Time:32
Type:Statement BatchSize:0 BindingsSize:1 
Query:["INSERT INTO test VALUES ($1)"] Bindings:[(200)]

datasource-proxy-r2dbc's People

Contributors

ttddyy avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.