Giter Site home page Giter Site logo

oracle-r2dbc's Introduction

About Oracle R2DBC

The Oracle R2DBC Driver is a Java library that supports reactive programming with Oracle Database.

Oracle R2DBC implements the R2DBC Service Provider Interface (SPI) as specified by the Reactive Relational Database Connectivity (R2DBC) project. The R2DBC SPI exposes Reactive Streams as an abstraction for remote database operations. Reactive Streams is a well defined standard for asynchronous, non-blocking, and back-pressured communication. This standard allows an R2DBC driver to interoperate with other reactive libraries and frameworks, such as Spring, Project Reactor, RxJava, and Akka Streams.

Learn More About R2DBC:

R2DBC Project Home Page

R2DBC Javadocs v1.0.0.RELEASE

R2DBC Specification v1.0.0.RELEASE

Learn More About Reactive Streams:

Reactive Streams Project Home Page

Reactive Streams Javadocs v1.0.3

Reactive Streams Specification v1.0.3

About This Version

The 1.2.0 release Oracle R2DBC implements version 1.0.0.RELEASE of the R2DBC SPI.

Fixes in this release:

New features in this release:

Updated dependencies:

  • Updated Oracle JDBC from 21.7.0.0 to 21.11.0.0
  • Updated Project Reactor from 3.5.0 to 3.5.11

Installation

Oracle R2DBC can be obtained from Maven Central.

<dependency>
  <groupId>com.oracle.database.r2dbc</groupId>
  <artifactId>oracle-r2dbc</artifactId>
  <version>1.2.0</version>
</dependency>

Oracle R2DBC can also be built from source using Maven: mvn clean install -DskipTests=true

If -DskipTests=true is omitted from the command above, then it will execute end-to-end tests which connect to an Oracle Database. Tests read the connection configuration from src/test/resources/config.properties.

Oracle R2DBC is compatible with JDK 11 (or newer), and has the following runtime dependencies:

  • R2DBC SPI 1.0.0.RELEASE
  • Reactive Streams 1.0.3
  • Project Reactor 3.5.11
  • Oracle JDBC 21.11.0.0 for JDK 11 (ojdbc11.jar)

The Oracle R2DBC Driver has been verified with Oracle Database versions 18, 19, 21, and 23.

Integration with Spring and Other Libraries

Oracle R2DBC can only interoperate with libraries that support the 1.0.0.RELEASE version of the R2DBC SPI. When using libraries like Spring and r2dbc-pool, be sure to use a version which supports the 1.0.0.RELEASE of the SPI.

Oracle R2DBC depends on the JDK 11 build of Oracle JDBC 21.11.0.0. Other libraries may depend on a different version of Oracle JDBC, and this version may be incompatible. To resolve incompatibilities, it may be necessary to explicitly declare the dependency in your project, ie:

<dependency>
    <groupId>com.oracle.database.jdbc</groupId>
    <artifactId>ojdbc11</artifactId>
    <version>21.11.0.0</version>
</dependency>

Code Examples

The following method returns an Oracle R2DBC ConnectionFactory

  static ConnectionFactory getConnectionFactory() {
    String user = getUser();
    char[] password = getPassword();
    try {
      return ConnectionFactories.get(
        ConnectionFactoryOptions.builder()
          .option(ConnectionFactoryOptions.DRIVER, "oracle")
          .option(ConnectionFactoryOptions.HOST, "db.host.example.com")
          .option(ConnectionFactoryOptions.PORT, 1521)
          .option(ConnectionFactoryOptions.DATABASE, "db.service.name")
          .option(ConnectionFactoryOptions.USER, user)
          .option(ConnectionFactoryOptions.PASSWORD, CharBuffer.wrap(password))
          .build());
    }
    finally {
      Arrays.fill(password, (char)0);
    }
  }

The following method uses Project Reactor's Flux to open a connection, execute a SQL query, and then close the connection:

Flux.usingWhen(
  getConnectionFactory().create(),
  connection ->
    Flux.from(connection.createStatement(
      "SELECT 'Hello, Oracle' FROM sys.dual")
      .execute())
      .flatMap(result ->
        result.map(row -> row.get(0, String.class))),
  Connection::close)
  .doOnNext(System.out::println)
  .doOnError(Throwable::printStackTrace)
  .subscribe();

When executed, the code above will asynchronously print the result of the SQL query.

The next example uses a named parameter marker, :locale_name, in the SQL command:

Flux.usingWhen(
  getConnectionFactory().create(),
  connection ->
    Flux.from(connection.createStatement(
      "SELECT greeting FROM locale WHERE locale_name = :locale_name")
      .bind("locale_name", "France")
      .execute())
      .flatMap(result ->
        result.map(row ->
          String.format("%s, Oracle", row.get("greeting", String.class)))),
  Connection::close)
  .doOnNext(System.out::println)
  .doOnError(Throwable::printStackTrace)
  .subscribe();

Like the previous example, executing the code above will asynchronously print a greeting message. "France" is set as the bind value for locale_name, so the query should return a greeting like "Bonjour" when row.get("greeting") is called.

Additional code examples can be found here.

Help

For help programming with Oracle R2DBC, ask questions on Stack Overflow tagged with [oracle] and [r2dbc]. The development team monitors Stack Overflow regularly.

Issues may be opened as described in our contribution guide.

Contributing

This project welcomes contributions from the community. Before submitting a pull request, please review our contribution guide.

Security

Please consult the security guide for our responsible security vulnerability disclosure process.

License

Copyright (c) 2021, 2023 Oracle and/or its affiliates.

This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

Documentation

This document specifies the behavior of the R2DBC SPI as implemented for the Oracle Database. This SPI implementation is referred to as the "Oracle R2DBC Driver" or "Oracle R2DBC" throughout the remainder of this document.

The Oracle R2DBC Driver implements behavior specified by the R2DBC 1.0.0.RELEASE Specification and Javadoc

Publisher objects created by Oracle R2DBC implement behavior specified by the Reactive Streams 1.0.3 Specification and Javadoc

The R2DBC and Reactive Streams specifications include requirements that are optional for a compliant implementation. Oracle R2DBC's implementation of these optional are specified in this document. This document also specifies additional functionality that is supported by Oracle R2DBC, but is not part of the R2DBC 1.0.0 Specification.

Connection Creation

The Oracle R2DBC Driver is identified by the name "oracle". The driver implements a ConnectionFactoryProvider located by an R2DBC URL identifing "oracle" as a driver, or by a DRIVER ConnectionFactoryOption with the value of "oracle".

Support for Standard R2DBC Options

The following standard ConnectionFactoryOptions are supported by Oracle R2DBC:

  • DRIVER
  • HOST
  • PORT
  • DATABASE
    • The database option is interpreted as the service name of an Oracle Database instance. System Identifiers (SID) are not recognized.
  • USER
  • PASSWORD
  • SSL
  • CONNECT_TIMEOUT
  • STATEMENT_TIMEOUT.
  • PROTOCOL
    • Accepted protocol values are "tcps", "ldap", and "ldaps"

Support for Extended R2DBC Options

Oracle R2DBC extends the standard set of R2DBC options to offer functionality that is specific to Oracle Database and the Oracle JDBC Driver. Extended options are declared in the OracleR2dbcOptions class.

Support for Supplier and Publisher as Option Values

Most options can have a value provided by a Supplier or Publisher.

Oracle R2DBC requests the value of an Option from a Supplier or Publisher each time the Publisher returned by ConnectionFactory.create() creates a new Connection. Each Connection can then be configured with values that change over time, such as a password which is periodically rotated.

If a Supplier provides the value of an Option, then Oracle R2DBC requests the value by invoking Supplier.get(). If get() returns null, then no value is configured for the Option. If get() throws a RuntimeException, then it is set as the initial cause of an R2dbcException emitted by the Publisher returned by ConnectionFactory.create(). The Supplier must have a thread safe get() method, as multiple subscribers may request connections concurrently.

If a Publisher provides the value of an Option, then Oracle R2DBC requests the value by subscribing to the Publisher and signalling demand. The first value emitted to onNext will be used as the value of the Option. If the Publisher emits onComplete before onNext, then no value is configured for the Option. If the Publisher emits onError before onNext, then the Throwable is set as the initial cause of an R2dbcException emitted by the Publisher returned by ConnectionFactory.create().

The following example configures the PASSWORD option with a Supplier:

  void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) {
  
    // Cast the PASSWORD option
    Option<Supplier<CharSequence>> suppliedOption = OracleR2dbcOptions.supplied(PASSWORD);
    
    // Supply a password
    Supplier<CharSequence> supplier = () -> getPassword();
    
    // Configure the builder
    optionsBuilder.option(suppliedOption, supplier); 
  }

A more concise example configures TLS_WALLET_PASSWORD as a Publisher

  void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) {
    optionsBuilder.option(
      OracleR2dbcOptions.published(TLS_WALLET_PASSWORD),
      Mono.fromSupplier(() -> getWalletPassword()));
  }

These examples use the supplied(Option) and published(Option) methods declared by oracle.r2dbc.OracleR2dbcOptions. These methods cast an Option<T> to Option<Supplier<T>> and Option<Publisher<T>>, respectively. It is necessary to cast the generic type of the Option when calling ConnectionFactoryOptions.Builder.option(Option<T>, T) in order for the call to compile and not throw a ClassCastException at runtime. It is not strictly required that supplied(Option) or published(Option) be used to cast the Option. These methods are only meant to offer code readability and convenience.

Note that the following code would compile, but fails at runtime with a ClassCastException:

  void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) {
    Publisher<CharSequence> publisher = Mono.fromSupplier(() -> getPassword());
    // Doesn't work. Throws ClassCastException at runtime:
    optionsBuilder.option(PASSWORD, PASSWORD.cast(publisher));
  }

To avoid a ClassCastException, the generic type of an Option must match the actual type of the value passed to ConnectionFactoryOptions.Builder.option(Option<T>, T).

For a small set of options, providing values with a Supplier or Publisher is not supported:

  • DRIVER
  • PROTOCOL

Providing values for these options would not be interoperable with io.r2dbc.spi.ConnectionFactories and r2dbc-pool.

Normally, Oracle R2DBC will not retain references to Option values after ConnectionFactories.create(ConnectionFactoryOptions) returns. However, if the value of at least one Option is provided by a Supplier or Publisher, then Oracle R2DBC will retain a reference to all Option values until the ConnectionFactory.create() Publisher emits a Connection or error. This is important to keep in mind when Option values may be mutated. In particular, a password may only be cleared from memory after the create() Publisher emits a Connection or error.

Configuring an Oracle Net Descriptor

The oracle.r2dbc.OracleR2dbcOptions.DESCRIPTOR option may be used to configure an Oracle Net Descriptor of the form (DESCRIPTION=...). If this option is used to configure a descriptor, then it is invalid to specify any other option that conflicts with information in the descriptor. Conflicting options include HOST, PORT, DATABASE, and SSL. These options all conflict with information that appears in a descriptor.

The DESCRIPTOR option has the name oracle.r2dbc.descriptor. This name can be used to configure a descriptor in the query section of an R2DBC URL:

r2dbc:oracle://?oracle.r2dbc.descriptor=(DESCRIPTION=...)

The DESCRIPTOR constant may also be used to configure a descriptor programmatically:

ConnectionFactoryOptions.builder()
  .option(OracleR2dbcOptions.DESCRIPTOR, "(DESCRIPTION=...)")

The DESCRIPTOR option may be set to an aliased entry of a tnsnames.ora file. Use the TNS_ADMIN option to specify the directory where tnsnames.ora is located:

r2dbc:oracle://?oracle.r2dbc.descriptor=myAlias&TNS_ADMIN=/path/to/tnsnames/

Configuring an LDAP URL

Use ldap or ldaps as the URL protocol to have an Oracle Net Descriptor retrieved from an LDAP server:

r2dbc:oracle:ldap://ldap.example.com:7777/sales,cn=OracleContext,dc=com
r2dbc:oracle:ldaps://ldap.example.com:7778/sales,cn=OracleContext,dc=com

Use a space separated list of LDAP URIs for fail over and load balancing:

r2dbc:oracle:ldap://ldap1.example.com:7777/sales,cn=OracleContext,dc=com%20ldap://ldap2.example.com:7777/sales,cn=OracleContext,dc=com%20ldap://ldap3.example.com:7777/sales,cn=OracleContext,dc=com

Space characters in a URL must be percent encoded as %20

An LDAP server request will block a thread for network I/O when Oracle R2DBC creates a new connection.

Configuring a java.util.concurrent.Executor

The oracle.r2dbc.OracleR2dbcOptions.EXECUTOR option configures a java.util.concurrent.Executor for executing asynchronous callbacks. The EXECUTOR option may be used to configure an Executor programmatically:

ConnectionFactoryOptions.builder()
  .option(OracleR2dbcOptions.EXECUTOR, getExecutor())

There is no way to configure an executor with a URL query parameter

If this option is not configured, then the common java.util.concurrent.ForkJoinPool is used as a default.

Configuring Oracle JDBC Connection Properties

A subset of Oracle JDBC's connection properties are also supported by Oracle R2DBC. These connection properties may be configured as options having the same name as the Oracle JDBC connection property, and may have CharSequence value types.

For example, the following URL configures the oracle.net.wallet_location connection property:

r2dbcs:oracle://db.host.example.com:1522/db.service.name?oracle.net.wallet_location=/path/to/wallet/

The same property can also be configured programmatically:

 ConnectionFactoryOptions.builder()
  .option(OracleR2dbcOptions.TLS_WALLET_LOCATION, "/path/to/wallet")

The next sections list Oracle JDBC connection properties which are supported by Oracle R2DBC.

TLS/SSL Connection Properties
Miscellaneous Connection Properties
Database Tracing Connection Properties
Oracle Net Encryption Connection Properties
Kerberos Connection Properties
LDAP Connection Properties

Thread Safety

Oracle R2DBC's ConnectionFactory and ConnectionFactoryProvider are the only classes that have a thread safe implementation. All other classes implemented by Oracle R2DBC are not thread safe. For instance, it is not safe for multiple threads to concurrently access a single instance of Result.

It is recommended to use a Reactive Streams library such as Project Reactor or RxJava to manage the consumption of non-thread safe objects

While it is not safe for multiple threads to concurrently access the same object, it is safe from them to do so with different objects from the same Connection. For example, two threads can concurrently subscribe to two Statement objects from the same Connection. When this happens, the two statements are executed in a "pipeline". Pipelining will be covered in the next section.

Pipelining

Pipelining allows Oracle R2DBC to send a call without having to wait for a previous call to complete. If all requirements are met, then pipelining will be activated by concurrently subscribing to publishers from the same connection. For example, the following code concurrently subscribes to two statements:

Flux.merge(
  connection.createStatement(
    "INSERT INTO example (id, value) VALUES (0, 'X')")
    .execute(),
  connection.createStatement(
    "INSERT INTO example (id, value) VALUES (1, 'Y')")
    .execute())

When the Publisher returned by merge is subscribed to, both INSERTs are immediately sent to the database. The network traffic can be visualized as:

TIME | ORACLE R2DBC     | NETWORK | ORACLE DATABASE
-----+------------------+---------+-----------------
   0 | Send INSERT-X    | ------> | WAITING
   0 | Send INSERT-Y    | ------> | WAITING
   1 | WAITING          | <------ | Send Result-X
   1 | WAITING          | <------ | Send Result-Y
   2 | Receive Result-X |         | WAITING
   2 | Receive Result-Y |         | WAITING

In this visual, 1 unit of TIME is required to transfer data over the network. The TIME column is only measuring network latency. It does not include computational time spent on executing the INSERTs.

The key takeaway from this visual is that the INSERTs are sent and received concurrently, rather than sequentially. Both INSERTs are sent at TIME=0, and both are received at TIME=1. And, the results are both sent at TIME=1, and are received at TIME=2.

Recall that TIME is not measuring computational time. If each action by Oracle R2DBC and Oracle Database requires 0.1 units of computational TIME, then we can say:

INSERTs are sent at TIME=0.1 and TIME=0.2, and are received at TIME=1.1 and TIME=1.2. And, the results are sent at TIME=1.2 and TIME=1.3, and are received at TIME=2.2 and TIME=2.3.

This is a bit more complicated to think about, but it is important to keep in mind. All database calls will require at least some computational time.

Below is another visual of the network traffic, but in this case the INSERTs are sent and received without pipelining:

TIME | ORACLE R2DBC     | NETWORK | ORACLE DATABASE
-----+------------------+---------+-----------------
   0 | Send INSERT-X    | ------> | WAITING
   1 | WAITING          | <------ | Send Result-X
   2 | Receive Result-X |         | WAITING
   2 | Send INSERT-Y    | ------> | WAITING
   3 | WAITING          | <------ | Send Result-Y
   4 | Receive Result-Y |         | WAITING

This visual shows a sequential process of sending and receiving. It can be compared to the concurrent process seen in the previous visual. In both cases, Oracle R2DBC and Oracle Database have the same number of WAITING actions. These actions are waiting for network transfers. And in both cases, each network transfer requires 1 unit of TIME.

So if network latency is the same, and the number of WAITING actions are the same (,and the computational times are the same), then how are these INSERTs completing in less TIME with pipelining? The answer is that pipelining allowed the network transfer times to be waited for concurrently.

In the first visual, with pipelining, the database waits for both INSERT-X and INSERT-Y at TIME=0. Compare that to the second visual, without pipelining, where the database waits for INSERT-X at TIME=0, and then waits again for INSERT-Y at TIME=2. That's 1 additional unit of TIME when compared to pipelining. The other additional unit of TIME happens on the Oracle R2DBC side. Without pipelining, it waits for Result-X at TIME=1, and then waits again for Result-Y at TIME=3. With pipelining, it waits for both results concurrently at TIME=1.

Requirements for Pipelining

There are some requirements which must be met in order to use pipelining. As explained in the previous section, the availability of pipelining can have a significant impact on performance. Users should review the requirements listed in this section when developing applications that rely on this performance gain.

In terms of functional behavior, the availability of pipelining will have no impact: With or without it, the same database calls are going be executed. Users who are not relying on pipelining performance do not necessarily need to review the requirements listed in this section. Oracle JDBC is designed to automatically check for these requirements, and it will fallback to using sequential network transfers if any requirement is not met.

Product Versions

Pipelining is only available with Oracle Database version 23.4 or newer. It also requires an Oracle JDBC version of 23.4 or newer, but this is already a transitive dependency of Oracle R2DBC.

Out Of Band Breaks

Pipelining requires out-of-band (OOB) breaks (ie: TCP urgent data) for cancelling statement execution. The Oracle JDBC Driver automatically checks if OOB is available, and will disable pipelining if it is not. The availability of OOB may depend on the operating system where Oracle R2DBC is running. Notably, OOB is not available on Mac OS (or at least not available in the way which Oracle JDBC needs it to be for sending TCP urgent data to Oracle Database).

For experimentation only, Mac OS users can choose to by-pass the OOB requirement by setting a JVM system property:

-Doracle.jdbc.disablePipeline=false

Bypassing the OOB requirement on Mac OS will result in non-functional implementations of Connection.setStatementTimeout(Duration), and Subscription.cancel() for a Subscription from Statement.execute().

Reactive Streams

Every method implemented by Oracle R2DBC that returns a Publisher has a JavaDoc which specifies the Publisher's behavior with regard to deferred execution and support for multiple Subscribers.

Oracle R2DBC's implementation of Publishers that emit one or zero items will typically defer execution until a Subscriber subscribes, support multiple Subscribers, and cache the result of a database call (the same result of the same call is emitted to each Subscriber).

Oracle R2DBC's implementation of Publishers that emit multiple items will typically defer execution until a Subscriber signals demand, and not support multiple subscribers.

Errors and Warnings

Oracle R2DBC creates R2dbcExceptions having the same ORA-XXXXX error codes used by Oracle Database and Oracle JDBC. The Database Error Messages document provides a reference for all ORA-XXXXX error codes.

Warning messages from Oracle Database are emitted as oracle.r2dbc.OracleR2dbcWarning segments. These segments may be consumed using Result.flatMap(Function):

result.flatMap(segment -> {
  if (segment instanceof OracleR2dbcWarning) {
    logWarning(((OracleR2dbcWarning)segment).getMessage());
    return emptyPublisher();
  }
  else if (segment instanceof Result.Message){
    ... handle an error ...
  }
  else {
    ... handle other segment types ...
  }
})

Unlike the errors of standard Result.Message segments, if a warning is not consumed by flatMap, then it will be silently discarded when a Result is consumed using the map or getRowsUpdated methods.

Transactions

Oracle R2DBC uses READ COMMITTED as the default transaction isolation level.

Oracle R2DBC also supports the SERIALIZABLE isolation level. If SERIALIZABLE isolation is configured, then the oracle.r2dbc.OracleR2dbcOptions.ENABLE_QUERY_RESULT_CACHE option must also be configured as false to avoid phantom reads.

READ COMMITTED and SERIALIZABLE are the only isolation levels supported by Oracle Database

Oracle Database does not support a lock wait timeout that is configurable within the scope of a transaction or session. Oracle R2DBC implements SPI methods that configure a lock wait timeout to throw UnsupportedOperationException.

Statements

Oracle R2DBC supports SQL execution with the Statement SPI.

Parameter Markers

A SQL command passed to Connection.createStatement(String) may include named parameter markers, unnamed parameter markers, or both.

Unnamed parameter markers may appear in SQL as a question mark (?):

connection.createStatement(
  "SELECT value FROM example WHERE id=?")
  .bind(0, 99)

The bind method must be called with a zero-based index to set the value of an unnamed parameter.

Named parameter markers may appear in SQL as a colon character (:) followed by an alpha-numeric name:

connection.createStatement(
  "SELECT value FROM example WHERE id=:id")
  .bind("id", 99)

The bind method may be called with a String valued name, or with zero-based index, to set the value of a named parameter. Parameter names are case-sensitive.

Batch Execution

The Statement.add() method may be used execute a DML command multiple times with a batch of different bind values. Oracle Database only supports batch execution for DML type SQL commands (INSERT/UPDATE/DELETE). Attempting to execute a SELECT query with a batch of bind values will result in an error.

Returning Generated Values

The Statement.returnGeneratedValues(String...) method may be called to return generated values from basic forms of INSERT and UPDATE statements.

If an empty set of column names is passed to returnGeneratedValues, the Statement will return the ROWID of each row affected by an INSERT or UPDATE.

Programmers are advised not to use the ROWID as if it were a primary key. The ROWID of a row change, or be reassigned to a different row. See https://asktom.oracle.com/pls/apex/asktom.search?tag=is-it-safe-to-use-rowid-to-locate-a-row for more information.

Returning generated values is only supported for INSERT and UPDATE commands in which a RETURNING INTO clause would be valid. For example, if a table is declared as:

CREATE TABLE example (
  id NUMBER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
  value VARCAHR(100))

Returning generated values is supported for the following statement:

connection.createStatement(
  "INSERT INTO example(value) VALUES (:value)")
  .bind("value", "x")
  .returningGeneratedValues("id")

This statement is supported because the INSERT could be written to include a RETURNING INTO clause:

INSERT INTO example(value) VALUES (:value) RETURING id INTO :id

As a counter example, returning generated values is not supported for the following statement:

connection.createStatement(
  "INSERT INTO example (value) SELECT 'y' FROM sys.dual")
  .returningGeneratedValues("id")

This statement is not supported because it can not be written to include a RETURNING INTO clause.

The Oracle Database SQL Language Reference specifies the INSERT and UPDATE commands for which a RETURNING INTO clause is supported.

For the INSERT syntax, see: https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/INSERT.html

For the UPDATE syntax, see: https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/UPDATE.html

Procedural Calls

The SQL string passed to Connection.createStatement(String) may execute a PL/SQL call:

connection.createStatement("BEGIN sayHello(:name_in, :greeting_out); END;")

OUT parameters are registered by invoking Statement.bind(int, Object) or Statement.bind(String, Object) with an instance of io.r2dbc.spi.Parameter implementing the io.r2dbc.spi.Parameter.Out marker interface:

statement.bind("greeting_out", Parameters.out(R2dbcType.VARCHAR))

Likewise, an IN OUT parameter would be registered by invoking Statement.bind(int, Object) or Statement.bind(String, Object) with an instance of io.r2dbc.spi.Parameter implementing both the io.r2dbc.spi.Parameter.Out and io.r2dbc.spi.Parameter.In marker interfaces.

OUT parameters are consumed by invoking Result.map(Function):

result.map(outParameters -> outParameters.get("greeting_out", String.class))

If a procedural call returns multiple results, the publisher returned by Statement.execute() emits one Result for each cursor returned by DBMS_SQL.RETURN_RESULT in the procedure. The order in which each Result is emitted corresponds to the order in which the procedure returns each cursor.

If a procedure returns cursors, and also has out parameters, then the Result for the out parameters is emitted last, after the Result for each cursor.

Type Mappings

Oracle R2DBC supports type mappings between Java and SQL for non-standard data types of Oracle Database.

Oracle SQL Type Java Type
JSON javax.json.JsonObject or oracle.sql.json.OracleJsonObject
DATE java.time.LocalDateTime
INTERVAL DAY TO SECOND java.time.Duration
INTERVAL YEAR TO MONTH java.time.Period
SYS_REFCURSOR io.r2dbc.spi.Result
VECTOR double[], float[], byte[], or oracle.sql.VECTOR

Unlike the standard SQL type named "DATE", the Oracle Database type named "DATE" stores values for year, month, day, hour, minute, and second. The standard SQL type only stores year, month, and day. LocalDateTime objects are able to store the same values as a DATE in Oracle Database.

BLOB, CLOB, and NCLOB

Oracle R2DBC allows large objects (LOBs) to be read and written as a reactive stream, or as a fully materialized value.

Prefetched LOB Data

When a SQL query returns a LOB column, only a portion of the LOB's content is received in the response from Oracle Database. The portion received in the SQL query response is referred to as "prefetched data". Any content remaining after the prefetched portion must be fetched with additional database calls.

For example, if a SQL query returns a LOB that is 100MB in size, then the response might prefetch only the first 1MB of the LOB's content. Additional database calls would be required to fetch the remaining 99MB of content.

By default, Oracle R2DBC attempts to prefetch the entire content of a LOB. Oracle R2DBC will request up to 1GB of prefetched data from Oracle Database when executing a SQL query.

Materialzed Type Mapping

The Row.get(...) method allows LOB values to be mapped into materialized types like ByteBuffer and String. If the entire LOB has been prefetched, then Row.get(...) can return a ByteBuffer/String without any additional database calls. However, if the LOB value is larger than the prefetch size, then Row.get(...) must execute a blocking database call to fetch the remainder of that value.

Streamed Type Mapping

In a system that consumes very large LOBs, a very large amount of memory will be consumed if the entire LOB is prefetched. When a LOB is too large to be prefetched entirely, a smaller prefetch size can be configured using the oracle.jdbc.defaultLobPrefetchSize option, and the LOB can be consumed as a stream. By mapping LOB columns to Blob or Clob objects, the content can be consumed as a reactive stream.

ARRAY

Oracle Database supports ARRAY as a user defined type only. A CREATE TYPE command is used to define an ARRAY type:

CREATE TYPE MY_ARRAY AS ARRAY(8) OF NUMBER

Oracle R2DBC defines oracle.r2dbc.OracleR2dbcType.ArrayType as a Type for representing user defined ARRAY types. A Parameter with a type of ArrayType must be used when binding array values to a Statement.

Publisher<Result> arrayBindExample(Connection connection) {
  Statement statement =
    connection.createStatement("INSERT INTO example VALUES (:array_bind)");

  // Use the name defined for an ARRAY type:
  // CREATE TYPE MY_ARRAY AS ARRAY(8) OF NUMBER
  ArrayType arrayType = OracleR2dbcTypes.arrayType("MY_ARRAY");
  Integer[] arrayValues = {1, 2, 3};
  statement.bind("arrayBind", Parameters.in(arrayType, arrayValues));

  return statement.execute();
}

A Parameter with a type of ArrayType must also be used when binding OUT parameters of a PL/SQL call.

Publisher<Result> arrayOutBindExample(Connection connection) {
  Statement statement =
    connection.createStatement("BEGIN; exampleCall(:array_bind); END;");

  // Use the name defined for an ARRAY type:
  // CREATE TYPE MY_ARRAY AS ARRAY(8) OF NUMBER
  ArrayType arrayType = OracleR2dbcTypes.arrayType("MY_ARRAY");
  statement.bind("arrayBind", Parameters.out(arrayType));

  return statement.execute();
}

ARRAY values may be consumed from a Row or OutParameter as a Java array. The element type of the Java array may be any Java type that is supported as a mapping for the SQL type of the ARRAY. For instance, if the ARRAY type is NUMBER, then a Integer[] mapping is supported:

Publisher<Integer[]> arrayMapExample(Result result) {
  return result.map(readable -> readable.get("arrayValue", Integer[].class));
}

OBJECT

Oracle Database supports OBJECT as a user defined type. A CREATE TYPE command is used to define an OBJECT type:

CREATE TYPE PET AS OBJECT(
  name VARCHAR(128),
  species VARCHAR(128),
  weight NUMBER,
  birthday DATE)

Oracle R2DBC defines oracle.r2dbc.OracleR2dbcType.ObjectType as a Type for representing user defined OBJECT types. A Parameter with a type of ObjectType may be used to bind OBJECT values to a Statement.

Use an Object[] to bind the attribute values of an OBJECT by index:

Publisher<Result> objectArrayBindExample(Connection connection) {
  Statement statement =
    connection.createStatement("INSERT INTO petTable VALUES (:petObject)");

  // Bind the attributes of the PET OBJECT defined above
  ObjectType objectType = OracleR2dbcTypes.objectType("PET");
  Object[] attributeValues = {
    "Derby",
    "Dog",
    22.8,
    LocalDate.of(2015, 11, 07)
  };
  statement.bind("petObject", Parameters.in(objectType, attributeValues));

  return statement.execute();
}

Use a Map<String,Object> to bind the attribute values of an OBJECT by name:

Publisher<Result> objectMapBindExample(Connection connection) {
  Statement statement =
    connection.createStatement("INSERT INTO petTable VALUES (:petObject)");

  // Bind the attributes of the PET OBJECT defined above
  ObjectType objectType = OracleR2dbcTypes.objectType("PET");
  Map<String,Object> attributeValues = Map.of(
    "name", "Derby",
    "species", "Dog",
    "weight", 22.8,
    "birthday", LocalDate.of(2015, 11, 07));
  statement.bind("petObject", Parameters.in(objectType, attributeValues));

  return statement.execute();
}

A Parameter with a type of ObjectType must be used when binding OUT parameters of OBJECT types for a PL/SQL call:

Publisher<Result> objectOutBindExample(Connection connection) {
  Statement statement =
    connection.createStatement("BEGIN; getPet(:petObject); END;");

  ObjectType objectType = OracleR2dbcTypes.objectType("PET");
  statement.bind("petObject", Parameters.out(objectType));

  return statement.execute();
}

OBJECT values may be consumed from a Row or OutParameter as an oracle.r2dbc.OracleR2dbcObject. The OracleR2dbcObject interface is a subtype of io.r2dbc.spi.Readable. Attribute values may be accessed using the standard get methods of Readable. The get methods of OracleR2dbcObject support all SQL to Java type mappings defined by the R2DBC Specification:

Publisher<Pet> objectMapExample(Result result) {
  return result.map(row -> {
    OracleR2dbcObject oracleObject = row.get(0, OracleR2dbcObject.class); 
    return new Pet(
      oracleObject.get("name", String.class),
      oracleObject.get("species", String.class),
      oracleObject.get("weight", Float.class),
      oracleObject.get("birthday", LocalDate.class));
  });
}

Instances of OracleR2dbcObject may be passed directly to Statement bind methods:

Publisher<Result> objectBindExample(
  OracleR2dbcObject oracleObject, Connection connection) {
  Statement statement =
    connection.createStatement("INSERT INTO petTable VALUES (:petObject)");
  
  statement.bind("petObject", oracleObject);

  return statement.execute();
}

Attribute metadata is exposed by the getMetadata method of OracleR2dbcObject:

void printObjectMetadata(OracleR2dbcObject oracleObject) {
  OracleR2dbcObjectMetadata metadata = oracleObject.getMetadata();
  OracleR2dbcTypes.ObjectType objectType = metadata.getObjectType();
  
  System.out.println("Object Type: " + objectType);
  metadata.getAttributeMetadatas()
    .stream()
    .forEach(attributeMetadata -> {
      System.out.println("\tAttribute Name: " + attributeMetadata.getName()));
      System.out.println("\tAttribute Type: " + attributeMetadata.getType()));
    });
}

REF Cursor

Use the oracle.r2dbc.OracleR2dbcTypes.REF_CURSOR type to bind SYS_REFCURSOR out parameters:

Publisher<Result> executeProcedure(Connection connection) {
  connection.createStatement(
    "BEGIN example_procedure(:cursor_parameter); END;")
  .bind("cursor_parameter", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
  .execute()
}

A SYS_REFCURSOR out parameter can be mapped to an io.r2dbc.spi.Result:

Publisher<Result> mapOutParametersResult(Result outParametersResult) {
  return outParametersResult.map(outParameters ->
    outParameters.get("cursor_parameter", Result.class));
}

The rows of a SYS_REFCURSOR may be consumed from the Result it is mapped to:

Publisher<ExampleObject> mapRefCursorRows(Result refCursorResult) {
  return refCursorResult.map(row ->
    new ExampleObject(
      row.get("id_column", Long.class),
      row.get("value_column", String.class)));
}

VECTOR

The default mapping for VECTOR is the oracle.sql.VECTOR class. Instances of this class may be passed to Statement.bind(int/String, Object):

void bindVector(Statement statement, float[] floatArray) throws SQLException {
  final VECTOR vector;
  try {
    vector = VECTOR.ofFloat32Values(floatArray);
  }
  catch (SQLException sqlException) {
    throw new IllegalArgumentException(sqlException);
  }
  statement.bind("vector", vector);
}

The oracle.sql.VECTOR class defines three factory methods: ofFloat64Values, ofFloat32Values, and ofInt8Values. These methods support Java to VECTOR conversions of boolean[], byte[], short[], int[], long[], float[], and double[]:

void bindVector(Statement statement, int[] intArray) {
  final VECTOR vector;
  try {
    vector = VECTOR.ofFloat64Values(intArray);
  }
  catch (SQLException sqlException) {
    throw new IllegalArgumentException(sqlException);
  }
  statement.bind("vector", vector);
}

The factory methods of oracle.sql.VECTOR may perform lossy conversions, such as when converting a double[] into a VECTOR of 32-bit floating point numbers. The JavaDocs of these methods specify which conversions are lossy.

The OracleR2dbcTypes.VECTOR type descriptor can be used to register an OUT or IN/OUT parameter:

void registerOutVector(Statement statement) {
  Parameter outVector = Parameters.out(OracleR2dbcTypes.VECTOR);
  statement.bind("vector", outVector);
}

The OracleR2dbcTypes.VECTOR type descriptor can also be used as an alternative to oracle.sql.VECTOR when binding an IN parameter to a double[], float[], or byte[]:

void bindVector(Statement statement, float[] floatArray) {
  Parameter inVector = Parameters.in(OracleR2dbcTypes.VECTOR, floatArray);
  statement.bind("vector", inVector);
}

Note that double[], float[], and byte[] can NOT be passed directly to Statement.bind(int/String, Object) when binding VECTOR data. The R2DBC Specification defines ARRAY as the default mapping for Java arrays.

A VECTOR column or OUT parameter is converted to oracle.sql.VECTOR by default. The column or OUT parameter can also be converted to double[], float[], or byte[] by passing the corresponding array class to the get methods:

float[] getVector(io.r2dbc.Readable readable) {
  return readable.get("vector", float[].class);
}

Returning VECTOR from DML

Returning a VECTOR column with Statement.returningGeneratedValues(String...) is not supported due to a defect in the 23.4 release of Oracle JDBC. Attempting to return a VECTOR column will result in a Subscriber that never receives onComplete or onError. The defect will be fixed in the next release of Oracle JDBC.

A RETURNING ... INTO clause can be used as a temporary workaround. This clause must appear within a PL/SQL block, denoted by the BEGIN and END; keywords. In the following example, a VECTOR column named "embedding" is returned:

Publisher<double[]> returningVectorExample(Connection connection, String vectorString) {

  Statement statement = connection.createStatement(
      "BEGIN INSERT INTO example(embedding)"
      + " VALUES (TO_VECTOR(:vectorString, 999, FLOAT64))"
      + " RETURNING embedding INTO :embedding;"
      + " END;")
    .bind("vectorString", vectorString)
    .bind("embedding", Parameters.out(OracleR2dbcTypes.VECTOR));

    return Flux.from(statement.execute())
      .flatMap(result ->
        result.map(outParameters ->
          outParameters.get("embedding", double[].class)));
}

Secure Programming Guidelines

The following security related guidelines should be adhered to when programming with the Oracle R2DBC Driver.

Defend Against SQL Injection Attacks

  • Always specify the parameters of a SQL command using the bind methods of io.r2dbc.spi.Statement.
    • Do not use String concatenation to specify parameters of a SQL command.
    • Do not use format Strings to specify parameters of a SQL command.

Protect Passwords

  • Do not hard code passwords in your source code.
  • Avoid hard coding passwords in the R2DBC URL.
    • When handling URL strings in code, be aware that a clear text password may appear in the user info section.
  • Use a sensitive io.r2dbc.spi.Option to specify passwords.
    • If possible, specify the Option's value as an instance of java.nio.CharBuffer or java.lang.StringBuffer and clear the contents immediately after ConnectionFactories.get(ConnectionFactoryOptions) has returned. Oracle R2DBC's implementation of ConnectionFactory does not retain a reference to the clear text password.

Protect Network Communications

  • Use SSL/TLS if possible. Use any of the following methods to enable SSL/TLS:
    • Specify the boolean value of true for io.r2dbc.spi.ConnectionFactoryOptions.SSL
    • Specify "r2dbcs:" as the R2DBC URL schema.
    • Specify "ssl=true" in the query section of the R2DBC URL.
  • Use Option.sensitiveValueOf(String) when creating an Option that specifies a password.
    • Option.sensitiveValueOf(OracleConnection.CONNECTION_PROPERTY_WALLET_PASSWORD)
      • An SSO wallet does not require a password.
    • Option.sensitiveValueOf(OracleConnection.CONNECTION_PROPERTY_THIN_JAVAX_NET_SSL_KEYSTOREPASSWORD)
    • Option.sensitiveValueOf(OracleConnection.CONNECTION_PROPERTY_THIN_JAVAX_NET_SSL_TRUSTSTOREPASSWORD)

Defend Against Denial-of-Service Attacks

  • Use a connection pool and configure a maximum size to limit the number of database sessions created by ConnectionFactory.create()
  • Enforce a maximum batch size to limit invocations of Statement.add() or Batch.add(String).
  • Enforce a maximum fetch size to limit values supplied to Statement.fetchSize(int).
  • Enforce a maximum buffer size to limit memory usage when reading Blob and Clob objects.

oracle-r2dbc's People

Contributors

akshatamoon avatar jeandelavarene avatar kuassim avatar michael-a-mcmahon avatar pat-goins avatar sephiroth-j avatar spavlusieva avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

oracle-r2dbc's Issues

Is there support for list in bind method?

Is there support for list in bind method?
I'm trying to perform a query with an "IN (list of numbers)" clause and an exception is thrown


java.lang.IllegalArgumentException: Unsupported Java type:class java.util.ArrayList
	at oracle.r2dbc.impl.OracleStatementImpl.requireSupportedJavaType(OracleStatementImpl.java:1554) ~[oracle-r2dbc-0.3.0.jar:0.3.0]

No Way to Configure a Non-Default Executor

Oracle R2DBC uses the common ForkJoinPool (FJP) by default. For some systems, this is not ideal as the FJP threads may be abused with blocking or long running operations.

It would be valuable if Oracle R2DBC allowed a non-default Executor to be configured.

Allow to specify v$session.program for connection

We need a way to set the v$session.program at the connection from a R2DBC Oracle connection and after reviewing documentation and stack overflow I haven't found a way to do it
(my case is within a Spring application, but I believe this to be irrelevant)

We've added an issue at StackOverflow without answers: https://stackoverflow.com/questions/72048278/specify-vsession-program-for-r2dbc-oracle-connection

We've tried some solutions including this but without success.

ConnectionFactories.get( ConnectionFactoryOptions.parse(dbUrl) .mutate() .option(Option.valueOf("v$session.program"), "PROGRAMNAME") .build());

After reviewing the driver source code, it seems the property would need to be supported at the OracleReactiveJdbcAdapter::JDBC_CONNECTION_PROPERTY_OPTIONS Set
but any other way to achieve the same goal would be useful.

If this is already possible please excuse me, but I haven't found one.

R2DBC integration with Mutiny example

I am new to both reactive-programming and R2DBC. I have recently come to know about a reactive oracle API (R2DBC) and wanted to see how I can integrate this inside my Quarkus/Kotlin project using Mutiny. I've seen the Mutiny example on https://r2dbc.io but wanted to know how I can convert this to, let's say, an entity that maps to a table? Essentially, I want to only perform database inserts based on the given entities but I see that R2DBC's example is just a simple native query. How can I accomplish this? I am currently using Hibernate ORM with Panache.

I will post here a sample snippet of my current code using only Mutiny:

fun save(entity: SomeDatabaseEntity): Uni<Void> =
    Uni.createFrom().voidItem().invoke { -> someDatabaseRepository.persistEntity(entity) }

Is there a similar way that I can achieve the above with R2DBC? Any suggestions/tips are greatly appreciated, thanks!

r2dbc pool support

When trying to use with spring-boot-starter-data-r2dbc and enabled pool
I get the following exception

2021-06-16 23:08:35,144 [RMI TCP Connection(2)-100.64.118.0] DEBUG org.springframework.core.env.PropertySourcesPropertyResolver [logKeyFound] [RMI TCP Connection(2)-100.64.118.0] [] : Found key 'local.server.port' in PropertySource 'server.ports' with value of type Integer
2021-06-16 23:08:35,377 [RMI TCP Connection(3)-100.64.118.0] DEBUG reactor.util.Loggers$Slf4JLogger [debug] [RMI TCP Connection(3)-100.64.118.0] [] : Obtaining new connection from the driver
2021-06-16 23:08:35,379 [RMI TCP Connection(3)-100.64.118.0] DEBUG reactor.util.Loggers$Slf4JLogger [debug] [RMI TCP Connection(3)-100.64.118.0] [] : should warm up 4 extra resources
2021-06-16 23:08:37,571 [ForkJoinPool.commonPool-worker-3] DEBUG reactor.util.Loggers$Slf4JLogger [debug] [ForkJoinPool.commonPool-worker-3] [] : Duplicate Subscription has been detected
java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)
	at reactor.core.Exceptions.duplicateOnSubscribeException(Exceptions.java:180)
	at reactor.core.publisher.Operators.reportSubscriptionSet(Operators.java:1083)
	at reactor.core.publisher.Operators.setOnce(Operators.java:1188)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:237)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:72)
	at reactor.core.publisher.Operators.reportThrowInSubscribe(Operators.java:225)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:71)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
	at reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:410)
	at reactor.pool.SimpleDequePool.lambda$drainLoop$7(SimpleDequePool.java:382)
	at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onNext(FluxDoOnEach.java:154)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onNext(FlowAdapters.java:218)
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher$ConnectionSubscription.emitConnection(OracleDataSource.java:2746)
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher$ConnectionSubscription.lambda$publishConnectionAsync$0(OracleDataSource.java:2733)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:840)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

my pool configuration

spring:
  r2dbc:
    username: ....
    password: ....
    url: r2dbc:...
    pool:
      initial-size: 5
      max-idle-time: 10m
      max-size: 30
      enabled: true
  data:
    r2dbc:
      repositories:
        enabled: true

Is working with pool currently supported?

LDAP URI it does not work

oracle-r2dbc version: 0.2.0
spring-boot: 2.5.2
Database: Oracle 19c

r2dbc:oracle:thin:@LDAP://server:port/database,cn=OracleContext,dc=WORLD

Error:

reactor.core.Exceptions$ErrorCallbackNotImplemented: io.r2dbc.spi.R2dbcTransientResourceException: [17002] [08006] Error de E/S: Invalid connection string format, a valid format is: "host:port:sid" (CONNECTION_ID=qyq8AuNhQreSmex/HBjgiA==)
Caused by: io.r2dbc.spi.R2dbcTransientResourceException: Error de E/S: Invalid connection string format, a valid format is: "host:port:sid" (CONNECTION_ID=qyq8AuNhQreSmex/HBjgiA==)
at oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException(OracleR2dbcExceptions.java:211)
at oracle.r2dbc.impl.OracleReactiveJdbcAdapter$$Lambda$1262/00000000A3888C30.apply(Unknown Source)
at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:6720)
at reactor.core.publisher.Flux$$Lambda$1264/000000008D4CE270.apply(Unknown Source)

code for test:

		ConnectionFactory connectionFactory = ConnectionFactories.get(
				"r2dbc:oracle:thin:@ldap://server:port/database,cn=OracleContext,dc=WORLD");

		Mono.from(connectionFactory.create())
				.flatMapMany(connection ->
						Flux.from(connection.createStatement(
								"SELECT 'Hello, Oracle' FROM sys.dual")
								.execute())
								.flatMap(result ->
										result.map((row, metadata) -> row.get(0, String.class)))
								.doOnNext(System.out::println)
								.thenMany(connection.close()))
				.subscribe();

thanks

TypeMappingTest fails

A failure similar to the following is reported during test runs:

[INFO] Running oracle.r2dbc.impl.TypeMappingTest
[ERROR] Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 3.482 s <<< FAILURE! - in oracle.r2dbc.impl.TypeMappingTest
[ERROR] oracle.r2dbc.impl.TypeMappingTest.testDatetimeTypeMappings  Time elapsed: 0.43 s  <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <2038-10-23T09:42:01.000000001> but was: <2038-10-23T16:42:01.000000001>
	at [email protected]/oracle.r2dbc.impl.TypeMappingTest.verifyTypeMapping(TypeMappingTest.java:458)
	at [email protected]/oracle.r2dbc.impl.TypeMappingTest.verifyTypeMapping(TypeMappingTest.java:415)
	at [email protected]/oracle.r2dbc.impl.TypeMappingTest.testDatetimeTypeMappings(TypeMappingTest.java:308)

Failed to obtain R2DBC, this publisher does not support multiple subscribers

I try to run r2dbc application with spring-boot-starter-data-r2dbc and oracle-r2dbc but it gives me an exception " org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection; nested exception is java.lang.IllegalStateException: This publisher does not support multiple subscribers".
Here is my properties file:
spring.r2dbc.url=r2dbc:oracle:thin://localhost:1521:orcl spring.r2dbc.username=user spring.r2dbc.password=password
and dependenices part of gradle file:
dependencies { ktlint 'com.pinterest:ktlint:0.41.0' implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion") implementation("org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion") implementation('com.fasterxml.jackson.module:jackson-module-kotlin') implementation('org.codehaus.janino:janino') implementation('org.springframework.cloud:spring-cloud-starter-stream-kafka') implementation('org.springframework.cloud:spring-cloud-stream') implementation('org.springframework.boot:spring-boot-configuration-processor') implementation('org.apache.httpcomponents:httpclient:4.5.8') implementation('org.springframework.retry:spring-retry') implementation("org.springframework.boot:spring-boot-starter-webflux") implementation('org.springframework.boot:spring-boot-starter-data-r2dbc') implementation('com.oracle.database.r2dbc:oracle-r2dbc') implementation('com.oracle.database.jdbc:ojdbc11:21.1.0.0') }
Am I missing any configuration or is this a known limitation?

Login timeout error when `connectTimeout` is set

I'm running Oracle tests on a remote docker with a slow connection and if .options.connectTimeout is set to anything (20 hours etc) after some time connection fails almost immediately:

Caused by: io.r2dbc.spi.R2dbcTimeoutException: [18714] [08006] Login timeout specified by DataSource.setLoginTimeout(int) or by the oracle.jdbc.loginTimeout property has expired
      at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.lambda$publishConnection$9(OracleReactiveJdbcAdapter.java:647)
      at reactor.core.publisher.Mono.lambda$onErrorMap$30(Mono.java:3474)
      at reactor.core.publisher.Mono.lambda$onErrorResume$32(Mono.java:3564)
      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
      at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
      at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
      at reactor.core.publisher.Operators.error(Operators.java:197)

The problem disappears when the property is removed.

Bad SQL grammar - Spring data R2DBC with Oracle

I'm using Spring Data R2DBC with Oracle 11g and I have the following error using method findById of R2dbcCrudRepository

executeMany; bad SQL grammar [SELECT GAME_PHASE.* FROM GAME_PHASE WHERE GAME_PHASE.ID = :P0_id FETCH FIRST 2 ROWS ONLY]

This is the repository declaration

public interface ReactiveGamePhaseRepository extends R2dbcRepository<GamePhase, Long> {
}

I don't understand why FETCH FIRST 2 ROWS ONLY is added to the query and it's the cause of problem.

I have the same problem writing the query using R2dbcEntityTemplate like below:

r2dbcEntityTemplate.selectOne(query(where("id").is(id)), GamePhase.class);

And these are the used dependancies:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    <version>2.5.12</version>
</dependency>

<dependency>
    <groupId>com.oracle.database.r2dbc</groupId>
    <artifactId>oracle-r2dbc</artifactId>
    <version>1.0.0</version>
</dependency>

Can't use Statement.returnGeneratedValues() with INSERT .. SELECT

This is perfectly valid Oracle SQL:

create table t (id number generated always as identity primary key, val number);
insert into t (val) select 1 from dual union all select 2 from dual;

But when I try to run the INSERT .. SELECT statement with r2dbc trying to fetch the generated identity values:

System.out.println((
    Flux.from(connectionFactory.create())
        .flatMap(c -> c
            .createStatement("insert into t (val) select 1 from dual union all select 2 from dual")
            .returnGeneratedValues("id")
            .execute())
        .flatMap(it -> it.map((r, m) -> r.get(0)))
        .collectList()
        .block().get(0)
));

Then I'm getting the following errors:

Exception in thread "main" io.r2dbc.spi.R2dbcBadGrammarException: [933] [42000] ORA-00933: SQL-Befehl wurde nicht korrekt beendet

	at oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException(OracleR2dbcExceptions.java:162)
	at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:6670)
	at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:6723)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onError(FlowAdapters.java:223)
	at oracle.jdbc.internal.CompletionStageUtil$BatchItemPublisher.subscribeToFailedBatch(CompletionStageUtil.java:544)
	at oracle.jdbc.internal.CompletionStageUtil$BatchItemPublisher.lambda$subscribe$0(CompletionStageUtil.java:507)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1703)
		at org.jooq.testscripts.R2DBC.main(R2DBC.java:35)
Caused by: java.sql.SQLSyntaxErrorException: ORA-00933: SQL-Befehl wurde nicht korrekt beendet

	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:628)
	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:562)
	at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1207)
	at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:727)
	at oracle.jdbc.driver.T4CTTIfun.lambda$receiveRPCAsync$2(T4CTTIfun.java:426)
	at oracle.jdbc.internal.CompletionStageUtil.handleNormalCompletion(CompletionStageUtil.java:176)
	at oracle.jdbc.internal.CompletionStageUtil.lambda$normalCompletionHandler$1(CompletionStageUtil.java:227)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
	at oracle.jdbc.driver.T4CTTIfun.lambda$receiveRPCAsync$0(T4CTTIfun.java:406)
	at oracle.jdbc.driver.RestrictedLock.lambda$runUnrestricted$0(RestrictedLock.java:253)
	at oracle.jdbc.driver.RestrictedLock.callUnrestricted(RestrictedLock.java:273)
	at oracle.jdbc.driver.RestrictedLock.runUnrestricted(RestrictedLock.java:252)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createInternalCodeExecutor$7(PhysicalConnection.java:11647)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createInternalCodeExecutor$8(PhysicalConnection.java:11644)
	... 6 more
Caused by: Error : 933, Position : 78, Sql = insert into t (val) select 1 from dual union all select 2 from dual RETURNING id INTO :1 , OriginalSql = insert into t (val) select 1 from dual union all select 2 from dual RETURNING id INTO ?, Error Msg = ORA-00933: SQL-Befehl wurde nicht korrekt beendet

	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:632)
	... 22 more

Support binding boolean values

While Oracle SQL doesn't (yet?) support the standard SQL BOOLEAN type, JDBC does to some extent, mapping BOOLEAN values to 1=TRUE and 0=FALSE. Unfortunately, this is currently not possible with the oracle-r2dbc driver. This fails:

System.out.println(
Flux.from(cf.create())
    .flatMap(c -> c.createStatement("select ? from dual").bind(0, true).execute())
    .flatMap(it -> it.map((r, m) -> r.get(0, Boolean.class)))
    .collectList()
    .block()
);

The exception being:

Exception in thread "main" oracle.r2dbc.impl.OracleR2dbcExceptions$OracleR2dbcException: [17004] [99999] Ungültiger Spaltentyp
	at oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException(OracleR2dbcExceptions.java:215)
	at oracle.r2dbc.impl.OracleR2dbcExceptions.runOrHandleSQLException(OracleR2dbcExceptions.java:247)
	at oracle.r2dbc.impl.OracleStatementImpl.setInParameters(OracleStatementImpl.java:1144)
	at oracle.r2dbc.impl.OracleStatementImpl.lambda$executeSql$2(OracleStatementImpl.java:674)
	at oracle.r2dbc.impl.OracleStatementImpl.lambda$execute$37(OracleStatementImpl.java:1342)
	at reactor.core.publisher.FluxUsingWhen.deriveFluxFromResource(FluxUsingWhen.java:119)
	at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:85)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8185)
	at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:93)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8185)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)
	at org.jooq.test.setup.DatabaseSetup$1.lambda$2(DatabaseSetup.java:283)
	at org.jooq.impl.Internal$1.onNext(Internal.java:433)
	at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onNext(FlowAdapters.java:218)
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher$ConnectionSubscription.emitConnection(OracleDataSource.java:2746)
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher$ConnectionSubscription.lambda$publishConnectionAsync$0(OracleDataSource.java:2733)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:844)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:391)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1176)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1647)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1614)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1703)
		at org.jooq.testscripts.R2DBC.main(R2DBC.java:42)
Caused by: java.sql.SQLException: Ungültiger Spaltentyp
	at oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical(OraclePreparedStatement.java:8722)
	at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8204)
	at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:8173)
	at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:12300)
	at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:12249)
	at oracle.jdbc.driver.OraclePreparedStatementWrapper.setObject(OraclePreparedStatementWrapper.java:1230)
	at oracle.r2dbc.impl.OracleStatementImpl.lambda$setInParameters$29(OracleStatementImpl.java:1145)
	at oracle.r2dbc.impl.OracleR2dbcExceptions.runOrHandleSQLException(OracleR2dbcExceptions.java:244)
	... 35 more

Binding a NULL value also fails:

System.out.println(
Flux.from(cf.create())
    .flatMap(c -> c.createStatement("select ? from dual").bindNull(0, Boolean.class).execute())
    .flatMap(it -> it.map((r, m) -> r.get(0, Boolean.class)))
    .collectList()
    .block()
);

... with a slightly different stack trace

Exception in thread "main" oracle.r2dbc.impl.OracleR2dbcExceptions$OracleR2dbcException: [17004] [99999] Ungültiger Spaltentyp: 16
	at oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException(OracleR2dbcExceptions.java:215)
	at oracle.r2dbc.impl.OracleR2dbcExceptions.runOrHandleSQLException(OracleR2dbcExceptions.java:247)
	at oracle.r2dbc.impl.OracleStatementImpl.setInParameters(OracleStatementImpl.java:1144)
	at oracle.r2dbc.impl.OracleStatementImpl.lambda$executeSql$2(OracleStatementImpl.java:674)
	at oracle.r2dbc.impl.OracleStatementImpl.lambda$execute$37(OracleStatementImpl.java:1342)
	at reactor.core.publisher.FluxUsingWhen.deriveFluxFromResource(FluxUsingWhen.java:119)
	at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:85)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8185)
	at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:93)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8185)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)
	at org.jooq.test.setup.DatabaseSetup$1.lambda$2(DatabaseSetup.java:283)
	at org.jooq.impl.Internal$1.onNext(Internal.java:433)
	at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onNext(FlowAdapters.java:218)
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher$ConnectionSubscription.emitConnection(OracleDataSource.java:2746)
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher$ConnectionSubscription.lambda$publishConnectionAsync$0(OracleDataSource.java:2733)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:844)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:391)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1176)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1647)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1614)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1703)
		at org.jooq.testscripts.R2DBC.main(R2DBC.java:42)
Caused by: java.sql.SQLException: Ungültiger Spaltentyp: 16
	at oracle.jdbc.driver.OracleStatement.getInternalType(OracleStatement.java:4822)
	at oracle.jdbc.driver.OraclePreparedStatement.setNullCritical(OraclePreparedStatement.java:4564)
	at oracle.jdbc.driver.OraclePreparedStatement.setNullInternal(OraclePreparedStatement.java:4437)
	at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8190)
	at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:8173)
	at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:12268)
	at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:12249)
	at oracle.jdbc.driver.OraclePreparedStatementWrapper.setObject(OraclePreparedStatementWrapper.java:1230)
	at oracle.r2dbc.impl.OracleStatementImpl.lambda$setInParameters$29(OracleStatementImpl.java:1145)
	at oracle.r2dbc.impl.OracleR2dbcExceptions.runOrHandleSQLException(OracleR2dbcExceptions.java:244)
	... 35 more

Binding an int works:

System.out.println(
Flux.from(cf.create())
    .flatMap(c -> c.createStatement("select ? from dual").bind(0, 1).execute())
    .flatMap(it -> it.map((r, m) -> r.get(0, Boolean.class)))
    .collectList()
    .block()
);

System.out.println(
Flux.from(cf.create())
    .flatMap(c -> c.createStatement("select ? from dual").bindNull(0, Integer.class).execute())
    .flatMap(it -> it.map((r, m) -> Optional.ofNullable(r.get(0, Boolean.class))))
    .collectList()
    .block()
);

Producing

[true]
[Optional.empty]

Notice how I can read a Boolean.class from a Row, but I can't bind it to the Statement. Given that ojdbc implements java.sql.Statement.setBoolean(), I think this is a bug or oversight.

java.lang.UnsupportedOperationException: This method is deprecated for removal

Hi,
Although the existing previous issues but I feel confused about what to do. So In this issue I will summarize what I am facing.
Development environment: Spring, R2DBC, Oracle DB
Versions: spring boot --> 2.7.0 , Oracle R2DBC --> 0.4.0
Task Objective: reading a specific entity from the db with its related entities using a customized written sql statement to make one single DB query for all data.
The repository code that retrieves the data:
@repository public class MyCustomRepository { private static final String query ="..."; private final DatabaseClient databaseClient; //org.springframework.r2dbc.core.DatabaseClient this.databaseClient.sql(query) // .bind("fName", fName) // .bind("lName", lName) // .fetch().all() // s .bufferUntilChanged(person -> person.get("id")) // .map(.......) }
Observations:

  • previous observation: primarily, I got the error "Failed to obtain R2DBC, this publisher does not support multiple subscribers", even if I specify the version of oracle-r2db, r2dbc-spi and r2dbc-pool explicitly to be
    <oracle-r2dbc.version>0.2.0</oracle-r2dbc.version> <r2dbc-spi.version>0.9.0.M1</r2dbc-spi.version> <r2dbc-pool.version>0.9.0.M1</r2dbc-pool.version>

  • current observation: java.lang.UnsupportedOperationException: This method is deprecated for removal getColumnNames().
    Based on the previous observation, I tried two different solutions:

  1. Using bom Borca-SR1
  2. using Spring 2.7.0 with oracle-r2dbc 0.4.0 (current implemented solution)
    but both of them raise the mentioned exception of depricated method getColumnNames()

I think this method is called in bufferUntilChanged(), so execluding it exciplicitly is not possible. The method bufferUntilChanged() helps me to aggregate each person entity with its related entities in map().
Questions:

  1. Is my thought that bufferUntilChanged() call getColumnNames() right? Should it be changed in bufferUntilChanged() implementation?
  2. Is there any way to decouple getColumnName() from bufferUntilChanged()?
  3. Is there any alternative to the method bufferUntilChanged() that does not raise such exception?

Sorry for the long description but as I am new to the thema I wanted o make sure to pull all the info. as much as spossible

Regressions comparing with 0.1

I'm hitting a lot of errors trying to upgrade the dependency to 0.2 in Micronaut R2dbc integration

  • We used to generate the boolean representation as NUMBER(3) and it used to work, now it fails because every boolean is expected to be BOOLEAN type which we don't have in version 18, the only one available officially at https://hub.docker.com/r/gvenzl/oracle-xe
  • It looks like byte[] type is not supported anymore and ByteBuffer is the only one supported

`Statement.returnGeneratedValues()` leads to `NullPointerException` upon `INSERT` that does not generate keys

With version 0.4.0, calling returnGeneratedValues() upon an insert leads to a NullPointerException. Omitting returnGeneratedValues() does not raise the exception.

DDL:

CREATE TABLE legoset (
    id          INTEGER PRIMARY KEY,
    version     INTEGER NULL,
    name        VARCHAR2(255) NOT NULL,
    manual      INTEGER NULL,
    cert        RAW(255) NULL
)

Statement:

INSERT INTO legoset (ID, NAME, MANUAL) VALUES (:P0_id, :P1_name, :P2_manual)

Code to reproduce:

ConnectionFactory connectionFactory = ConnectionFactories.get(options);

Connection connection = Mono.from(connectionFactory.create()).block();

Flux.from(connection.createStatement("DROP TABLE legoset").execute())
		.flatMap(Result::getRowsUpdated).onErrorResume(it -> Mono.empty()).blockLast();

String CREATE_TABLE_LEGOSET = "CREATE TABLE legoset (\n" //
	+ "    id          INTEGER PRIMARY KEY,\n" //
	+ "    version     INTEGER NULL,\n" //
	+ "    name        VARCHAR2(255) NOT NULL,\n" //
	+ "    manual      INTEGER NULL,\n" //
	+ "    cert        RAW(255) NULL\n" //
	+ ")";

Flux.from(connection.createStatement(CREATE_TABLE_LEGOSET).execute())
		.flatMap(Result::getRowsUpdated).blockLast();


Flux.from(connection.createStatement("INSERT INTO legoset (ID, NAME, MANUAL) VALUES (:P0_id, :P1_name, :P2_manual)")
				.bind("P0_id", 42055)
				.bind("P1_name", "SCHAUFELRADBAGGER")
				.bind("P2_manual", 12)
				.returnGeneratedValues()
				.execute()).flatMap(Result::getRowsUpdated)
		.blockLast();

Stack trace:

java.lang.NullPointerException
	at oracle.jdbc.driver.OracleStatement.getMoreResults(OracleStatement.java:5851)
	at oracle.jdbc.driver.OracleStatementWrapper.getMoreResults(OracleStatementWrapper.java:298)
	at oracle.r2dbc.impl.OracleStatementImpl$JdbcReturningGenerated.lambda$executeJdbc$0(OracleStatementImpl.java:1582)
	at oracle.r2dbc.impl.AsyncLock.lambda$get$2(AsyncLock.java:161)
	at oracle.r2dbc.impl.AsyncLock.unlock(AsyncLock.java:122)
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.terminate(AsyncLock.java:510)
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:496)
	at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onComplete(FlowAdapters.java:228)
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitComplete(CompletionStageUtil.java:681)
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitItems(CompletionStageUtil.java:628)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Flux.blockLast(Flux.java:2645)
		at org.springframework.data.r2dbc.core.Repro.reproducer(Repro.java:86)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
		at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.base/java.lang.reflect.Method.invoke(Method.java:566)
		at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
		at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
		at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
		at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)

AQ support via R2DBC

Maybe it's premature, but I can hope :)

One awesome Oracle feature that could greatly profit from R2DBC support is Oracle AQ. Instead of blocking on DBMS_AQ.DEQUEUE calls, I could imagine a Publisher per queue that clients could subscribe to in various ways. Are there any plans for this yet, or too soon?

Getting ORA-12514, TNS:listener does not currently know of service requested in connect descriptor

HI team,

Getting this below error while connecting to remote oracle instance using r2dbc. attaching pom.xml

Failed to obtain R2DBC Connection; nested exception is io.r2dbc.spi.R2dbcTransientResourceException: [12514] [08006] Listener refused the connection with the following error: ORA-12514, TNS:listener does not currently know of service requested in connect descriptor (CONNECTION_ID=PzBG3pX8QteIbP4da43kpA==)
org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection; nested exception is io.r2dbc.spi.R2dbcTransientResourceException: [12514] [08006] Listener refused the connection with the following error:
ORA-12514, TNS:listener does not currently know of service requested in connect descriptor
(CONNECTION_ID=PzBG3pX8QteIbP4da43kpA==)
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:88)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler com.example.demo.StudentController#getAllTaskStateEnum() [DispatcherHandler]
|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/students/getAllTaskStateEnum" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:88)
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3676)


4.0.0

org.springframework.boot
spring-boot-starter-parent
2.6.0-M1


com.example
demo-2
0.0.1-SNAPSHOT
demo-2
Demo project for Spring Boot

<java.version>11</java.version>



org.springframework.boot
spring-boot-starter-actuator


org.springframework.boot
spring-boot-starter-data-r2dbc


org.springframework.boot
spring-boot-starter-webflux

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-devtools</artifactId>
		<scope>runtime</scope>
		<optional>true</optional>
	</dependency>
	<dependency>
		<groupId>com.oracle.database.r2dbc</groupId>
		<artifactId>oracle-r2dbc</artifactId>
		<version>0.2.0</version>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<optional>true</optional>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<scope>test</scope>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-maven-plugin</artifactId>
			<configuration>
				<excludes>
					<exclude>
						<groupId>org.projectlombok</groupId>
						<artifactId>lombok</artifactId>
					</exclude>
				</excludes>
			</configuration>
		</plugin>
	</plugins>
</build>
<repositories>
	<repository>
		<id>spring-milestones</id>
		<name>Spring Milestones</name>
		<url>https://repo.spring.io/milestone</url>
		<snapshots>
			<enabled>false</enabled>
		</snapshots>
	</repository>
</repositories>
<pluginRepositories>
	<pluginRepository>
		<id>spring-milestones</id>
		<name>Spring Milestones</name>
		<url>https://repo.spring.io/milestone</url>
		<snapshots>
			<enabled>false</enabled>
		</snapshots>
	</pluginRepository>
</pluginRepositories>

Cannot fetch a NULL literal from a Result

I'm trying to fetch the result of a NULL literal without explicit type from an R2DBC result, but I can't seem to do it:

System.out.println((
    Flux.from(connectionFactory.create())
        .flatMap(c -> c
            .createStatement("select null from dual")
            .execute())
        .flatMap(it -> it.map((r, m) -> r.get(0, Integer.class)))
        .collectList()
        .block().get(0)
));

This produces:

Exception in thread "main" java.lang.NullPointerException: Row mapping function returned null
	at oracle.jdbc.driver.InsensitiveScrollableResultSet$RowPublisher.mapCurrentRow(InsensitiveScrollableResultSet.java:1302)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1703)

When I try fetching binary data:

System.out.println((
    Flux.from(connectionFactory.create())
        .flatMap(c -> c
            .createStatement("select null from dual")
            .execute())
        .flatMap(it -> it.map((r, m) -> r.get(0, ByteBuffer.class)))
        .collectList()
        .block().get(0)
));

I'm getting:

Exception in thread "main" java.lang.IllegalArgumentException: java.sql.SQLException: Ungültiger Spaltentyp
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter$OracleJdbcRow.getObject(OracleReactiveJdbcAdapter.java:1193)
	at oracle.r2dbc.impl.OracleRowImpl.getByteBuffer(OracleRowImpl.java:272)
	at oracle.r2dbc.impl.OracleRowImpl.convertColumnValue(OracleRowImpl.java:245)
	at oracle.r2dbc.impl.OracleRowImpl.get(OracleRowImpl.java:137)
	at org.jooq.testscripts.R2DBC.lambda$2(R2DBC.java:30)
	at oracle.r2dbc.impl.OracleResultImpl$2.lambda$publishRows$0(OracleResultImpl.java:119)
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.lambda$publishRows$10(OracleReactiveJdbcAdapter.java:698)
	at oracle.jdbc.driver.InsensitiveScrollableResultSet$RowPublisher.mapCurrentRow(InsensitiveScrollableResultSet.java:1299)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1703)
		at org.jooq.testscripts.R2DBC.main(R2DBC.java:32)
Caused by: java.sql.SQLException: Ungültiger Spaltentyp
	at oracle.jdbc.driver.Redirector$99.redirect(Redirector.java:1448)
	at oracle.jdbc.driver.Redirector$99.redirect(Redirector.java:1444)
	at oracle.jdbc.driver.Representation.getObject(Representation.java:567)
	at oracle.jdbc.driver.Accessor.getObject(Accessor.java:1025)
	at oracle.jdbc.driver.OracleStatement.getObject(OracleStatement.java:6827)
	at oracle.jdbc.driver.InsensitiveScrollableResultSet$RowPublisher$ExpiringRow.getObject(InsensitiveScrollableResultSet.java:1390)
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter$OracleJdbcRow.getObject(OracleReactiveJdbcAdapter.java:1185)

I'm using:

<dependency>
    <groupId>com.oracle.database.r2dbc</groupId>
    <artifactId>oracle-r2dbc</artifactId>
    <version>0.1.0</version>
</dependency>        
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>0.9.0.M1</version>
</dependency>

Memory Leak Suspect in ForwardOnlyResultSet

Hi, i am so glad to be using this wonderful reactive driver for Oracle!
Thanks

Application

  • Java Version: 17.0.2 openjdk
  • oracle-r2dbc Version: 1.0.0
  • r2dbc-pool Version: 0.9.1
  • ojdbc Version: ojdbc11:21.3.0.0

Description

I have in production an application which, servers a lot of data, and, have many hits per second.
Basically, it uses cache-aside technique with redis.
So, go redis, key not in redis, go to oracle, do some mappings, store it into redis.

It works like a charm, but, when the application is running on a day without interruption, on our apm, I start to see a memory consumption growth, and the garbage collector can not clean the memory and, the memory average starts to growth, and growth and growth.

I took a heap dump, and, with eclipse memory analyzer I saw that, the heap has a lot of ForwardOnlyResultSet objects, which, it seems that its not closed after the use.

Here, we can see that we have a lot of instances of ForwardOnlyResultSet
image

Since, we are using r2dbc-pool, with a pool of 5 connections,
image

I might be missing something, and forgetting to close the result set. Which, is something that I would to on pure jdbc. But, I havent seen nothing here in the docs about that.

Example of use

I suspect that I am not closing the result set below, I am just closing(returning the connection back to the pool) after each statement

    public Mono<CouponDto> fetchCoupon(final long itemId, final long couponId, final long subsidiaryId) {
        return Flux.usingWhen(
                connectionPool.create(),
                connection -> Mono.from(connection.createStatement(QUERY)
                        .bind(0, subsidiaryId)
                        .bind(1, itemId)
                        .bind(2, couponId)
                        .execute()
                ).flatMapMany(it -> it.map(mapper)),
                Connection::close,
                ((connection, throwable) -> connection.close()),
                Connection::close
        ).next();
    }

    private final BiFunction<Row, RowMetadata, CouponDto> mapper = (row, rowMetadata) ->
            new CouponDto(
                    row.get("id", Long.class),
                    row.get("discountPercentage", Double.class),
                    row.get("listPrice", Double.class),
                    row.get("discountType", Integer.class)
            );

Again, thanks for your help & time!
Best Regards, Matheus Rambo

v0.2.0 with ReactiveCrudRepository

Recently, I updated oracle r2dbc to v 0.2.0 and no queries are able to execute. It just hangs.
last records in logs

2021-06-16 23:16:18,861 [reactor-http-nio-4] DEBUG org.springframework.core.log.LogFormatUtils [traceDebug] [reactor-http-nio-4] [] : [acae8c1f-1, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:53282] HTTP GET "/api/getToken?token=asdasd"
2021-06-16 23:16:18,878 [reactor-http-nio-4] DEBUG org.springframework.web.reactive.handler.AbstractHandlerMapping [lambda$getHandler$1] [reactor-http-nio-4] [] : [acae8c1f-1, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:53282] Mapped to acme.TokenController#request(String)
2021-06-16 23:16:20,482 [ForkJoinPool.commonPool-worker-3] DEBUG org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec [lambda$execute$2] [ForkJoinPool.commonPool-worker-3] [] : Executing SQL statement [SELECT ssn.ssn_id, ssn.a_tkn, ssn.a_typ, ssn.a_id, ssn.a_sbj FROM ssn WHERE ssn.a_tkn = :P0_atkn]

after the last record, it is just hangs

If I change the version back, it works

I am using ReactiveCrudRepository and spring-boot-starter-data-r2dbc

Deadlock results from Batch.execute()

Executing Oracle R2DBC's implementation of io.r2dbc.spi.Batch can result in a deadlock. The cause is rooted in Oracle JDBC's use of locks to ensure thread safe APIs.

A stack trace similar to the following is a symptom:

"ForkJoinPool.commonPool-worker-3" #16 daemon prio=5 os_prio=0 cpu=844.91ms elapsed=2204.18s tid=0x00007fb818004800 nid=0xbb1 waiting on condition  [0x00007fb80dfb8000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
	- parking to wait for  <0x00000000e13d1a38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
	at oracle.jdbc.driver.RestrictedLock.awaitRestrictedModeExit([email protected]/RestrictedLock.java:208)
	at oracle.jdbc.driver.RestrictedLock.lock([email protected]/RestrictedLock.java:140)
	at oracle.jdbc.internal.Monitor.acquireLock([email protected]/Monitor.java:131)
	at oracle.jdbc.internal.Monitor.acquireCloseableLock([email protected]/Monitor.java:109)
	at oracle.jdbc.driver.InsensitiveScrollableResultSet$RowPublisher$ExpiringRow.getObject([email protected]/InsensitiveScrollableResultSet.java:1388)
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter$OracleJdbcRow.getObject([email protected]/OracleReactiveJdbcAdapter.java:1185)
	at oracle.r2dbc.impl.OracleRowImpl.convertColumnValue([email protected]/OracleRowImpl.java:251)
	at oracle.r2dbc.impl.OracleRowImpl.get([email protected]/OracleRowImpl.java:137) 

The deadlock comes about as follows, where a Batch of two SQL commands is executed and Oracle JDBC's thread pool consists of 1 thread:

  1. An invocation of OraclePreparedStatement.executeAsyncOracle() executes the 2nd SQL command in the batch. (The first command has already executed).
  2. The executeAsyncOracle method acquires the Oracle JDBC Connection's lock and initiates a database call.
  3. A callback is registered that will release the lock after the database response is received.
  4. On the executor thread, an oracle.jdbc.OracleRow resulting from the 1st SQL command of the batch is processed by a mapping function.
  5. The function invokes OracleRow.getObject(int, Class).
  6. The implementation of getObject attempts to acquire the Oracle JDBC Connection's lock, but the lock-releasing callback described in step 3 has not executed yet.
  7. The executor thread is blocked until the lock is released.
  8. The database response is received and the lock-releasing callback described in step 1 is submitted to the executor.
  9. The lock-releasing callback can not progress until the executor thread becomes available.
  10. The executor thread can not become available until the getObject-calling task can acquire the lock.
  11. DEADLOCK: The locker-releaser needs the getObject-caller's thread to progress. The getObject-caller needs the locker-releaser's lock to progress.

Note that this issue exists regardless of the thread pool size. If enough Batch results are being processed concurrently, there is potential to exhaust the thread pool as more threads become blocked waiting for the connection lock.

Note that the general limitation of blocked threads arising from concurrent calls on one connection is known. The Oracle R2DBC documentation advises programmers against doing this. Resolving this limitation is outside the scope of this issue.

This issue is filed because Oracle R2DBC's Batch implementation is not respecting it's own limitations. The implementation is executing a SQL command while application code is processing the result of another command concurrently.

Transaction isolation

Hi, just wondering where support for transaction isolation levels other than READ COMMITED might be on the roadmap? Thanks.

Open connection validation needs to be inside of the subscribed stream

Hello,

First, and completely out of scope for a github issue, I've enjoyed both parts of your Reactive Summit presentation.

This is a little bit of a drive-by comment because I am not an Oracle R2DBC driver user, but we've ran into similar scenarios when implementing the Cloud Spanner R2DBC driver. In OracleConnectionImpl (for example when beginning a transaction), the validation of whether the connection is currently open happens in the beginning of the method, outside of the reactive stream creation. That can lead to the validation passing at the time the method is called (when the reactive stream operation chain is defined), but the connection becoming closed by the time the resulting publisher is subscribed to.

Is Oracle support back ?

Hi,
It has been unclear if there is Oracle database support for spring r2dbc. Is it active after Oracles odbc extention api?
When is the first lease planned ?

java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer

I'm still working on some tests with this driver, and whenever I need to read a numeric value from result set and cast it to Int like below:

    return r2dbcTemplate
               .databaseClient
               .sql("select count(1) as total_orders from orders")
               .fetch()
               .one()
               .flatMap { row ->
                   Mono.just(row["total_orders"] as Int)
               }

I'm getting this error:

 java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer (java.math.BigDecimal and java.lang.Integer are in module java.base of loader 'bootstrap')
        ...
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:488) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:421) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:185) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:405) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:540) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1857) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:323) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Operators$MonoSubscriber.onComplete(Operators.java:1857) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onComplete(MonoIgnoreThen.java:323) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:678) ~[reactor-pool-0.2.3.jar!/:0.2.3]
 	at reactor.core.publisher.Operators.complete(Operators.java:136) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Mono.subscribe(Mono.java:4099) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:790) ~[reactor-pool-0.2.3.jar!/:0.2.3]
 	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Mono.subscribe(Mono.java:4099) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:83) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Operators.error(Operators.java:197) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Mono.subscribe(Mono.java:4099) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:397) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.4.jar!/:3.4.4]
 	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onComplete(FlowAdapters.java:228) ~[reactive-streams-1.0.3.jar!/:na]
 	at oracle.jdbc.driver.PhasedPublisher$PhasedSubscription.emitComplete(PhasedPublisher.java:434) ~[ojdbc11-21.1.0.0.jar!/:21.1.0.0.0]
 	at oracle.jdbc.driver.PhasedPublisher.lambda$subscribe$3(PhasedPublisher.java:343) ~[ojdbc11-21.1.0.0.jar!/:21.1.0.0.0]
 	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[na:na]
 	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[na:na]
 	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[na:na]
 	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713) ~[ojdbc11-21.1.0.0.jar!/:21.1.0.0.0]
 	at java.base/java.security.AccessController.doPrivileged(AccessController.java:391) ~[na:na]
 	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711) ~[ojdbc11-21.1.0.0.jar!/:21.1.0.0.0]
 	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1429) ~[na:na]
 	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[na:na]
 	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016) ~[na:na]
 	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665) ~[na:na]
 	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598) ~[na:na]
 	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[na:na]

Off course, it is possible to use BigDecimal for any numeric value (and then converting it to Int/Long, etc), but I don't know if this is some compatibility error between spring/ojdbc11/oracle-r2dbc, or something expected at this moment.

Are other numeric types (besides BigDecimal) supported already?

IllegalStateException: One or more parameters are not set on reusing named parameter multiple times

It seems that using a named parameter multiple times isn't working properly.

Consider the following statement:

SELECT id, name, manual FROM legoset WHERE name = :P0_name or name = :P0_name

Trying to run the statement with:

Mono.from(connectionFactory.create())
		.flatMapMany(it -> {

	return it.createStatement("SELECT id, name, manual FROM legoset WHERE name = :P0_name or name = :P0_name")
		.bind("P0_name", "unknown")
		.execute();
	}).flatMap(it -> it.map((row, rowMetadata) -> row.get(0))).blockLast();

fails with:

java.lang.IllegalStateException: One or more parameters are not set

	at oracle.r2dbc.impl.OracleStatementImpl.requireAllParametersSet(OracleStatementImpl.java:790)
	at oracle.r2dbc.impl.OracleStatementImpl.add(OracleStatementImpl.java:341)
	at oracle.r2dbc.impl.OracleStatementImpl.execute(OracleStatementImpl.java:458)

Running the statement as SELECT id, name, manual FROM legoset WHERE name = :P0_name works fine.

java.lang.ClassCastException when saving entitiy

Hello, i have a simple User entity class.
when i tried to save, i hit this exception. tired using different other database like Mysql, and there isn't such exception.

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.lang.Integer (java.lang.Long and java.lang.Integer are in module java.base of loader 'bootstrap')
Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.lang.Integer (java.lang.Long and java.lang.Integer are in module java.base of loader 'bootstrap')
	at java.base/java.util.stream.Collectors.lambda$summingInt$19(Collectors.java:681) ~[na:na]
	at reactor.core.publisher.MonoStreamCollector$StreamCollectorSubscriber.onNext(MonoStreamCollector.java:132) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.onNext(FluxConcatArray.java:364) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:360) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.request(FluxConcatArray.java:461) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:964) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.onSubscribe(FluxConcatArray.java:350) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:265) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8466) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.onComplete(FluxConcatArray.java:443) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:73) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8466) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:201) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172) ~[reactor-core-3.4.22.jar:3.4.22]
	at oracle.r2dbc.impl.AsyncLock.lambda$get$2(AsyncLock.java:167) ~[oracle-r2dbc-1.0.0.jar:1.0.0]
	at oracle.r2dbc.impl.AsyncLock.unlock(AsyncLock.java:125) ~[oracle-r2dbc-1.0.0.jar:1.0.0]
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.terminate(AsyncLock.java:516) ~[oracle-r2dbc-1.0.0.jar:1.0.0]
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:502) ~[oracle-r2dbc-1.0.0.jar:1.0.0]
	at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123) ~[reactor-core-3.4.22.jar:3.4.22]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.22.jar:3.4.22]
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onComplete(FlowAdapters.java:221) ~[reactive-streams-1.0.4.jar:na]
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitComplete(CompletionStageUtil.java:804) ~[ojdbc11-21.7.0.0.jar:21.6.0.0.0]
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitItems(CompletionStageUtil.java:751) ~[ojdbc11-21.7.0.0.jar:21.6.0.0.0]
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395) ~[na:na]
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) ~[na:na]
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) ~[na:na]
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) ~[na:na]
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) ~[na:na]
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) ~[na:na]

This is my Repository class

@Repository
interface UsersRepository : ReactiveCrudRepository<Users, Long> {

    @Query("SELECT SYS_CONTEXT('USERENV','CURRENT_SCHEMA') tschema FROM DUAL")
    fun getCurrentSchema(): Mono<String>
}

This is my Entity Class

@Table(name = "users")
class Users {

    @Id
    @Column
    @GeneratedValue(strategy= GenerationType.IDENTITY)
    var id: Long? = null

    @Column
    var username: String? = null

    @Column
    var email: String? = null

    @Column
    var password: String? = null

    @Column
    var resetUuid: String? = null

    @Column
    var expiryDatetime: LocalDateTime? = null
}

Unhelpful Error Message if config.properties is missing

The test suite reads database connection configuration from a "config.properties" file. If this file does not exist, then the DatabaseConfig class fails to initialize, and test execution will report an error like this:

[ERROR] oracle.r2dbc.OracleTestKit.compoundStatement  Time elapsed: 0 s  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class oracle.r2dbc.DatabaseConfig
	at [email protected]/oracle.r2dbc.OracleTestKit.<init>(OracleTestKit.java:94)

This error message is not helpful because it doesn't inform the reader that the config.properties file is missing.

Cannot change password of connection factory after creation

I'm using this driver in a r2dbc connection pool which is meant to have variable size.
The password for the database I'm accessing rotates hourly...
I was hoping to extend the classes

https://github.com/oracle/oracle-r2dbc/blob/main/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java
https://github.com/oracle/oracle-r2dbc/blob/main/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java

to use an extended version of OracleDataSource where the getConnection() functions can call out to retrieve updated passwords.

However, it seems nearly everything in this lib is a final or protected class and cannot be extended...

Is there a better solution to this? Do I have to just destroy my connection pool every time it fails to create a new connection and recreate it entirely?

Would it be possible to make these classes public and non-final? Or even allow a supplier like Mono<String> for the password?

LOCK_WAIT_TIMEOUT

Getting error as java.long.NosSuchFieldError : LOCK_WAIT_TIMEOUT exception..

NullPointer when using R2DBC Pool 0.9.0.RELEASE with version 0.4.0

Hi, I am facing a weird problem using:

Java Version: Oracle JDK 17

implementation 'io.r2dbc:r2dbc-pool:0.9.0.RELEASE'
implementation 'io.r2dbc:r2dbc-spi:0.9.0.RELEASE'
runtimeOnly 'com.oracle.database.r2dbc:oracle-r2dbc:0.4.0'

First of all, I was using the version 0.1.0 with R2DBC POOL, and, it works when executing a single statement, but, with paralell calls, I face the problem with: Multiple subscribers ...

So, I came here, and, read that thats a know problem with version 0.1.0. Then, I upgrade to the version 0.4.0 which, is the latest.
But, trying to execute the same query(which works with version 0.1.0) , but, I am receiving a NullPointer inside oracle r2dbc classes.

StackTrace:

java.lang.NullPointerException: Cannot invoke "java.util.ArrayDeque.size()" because "this.implicitResultSetStatements" is null
	at oracle.jdbc.driver.OracleStatement.getMoreResults(OracleStatement.java:5851)
	at oracle.jdbc.driver.OracleStatementWrapper.getMoreResults(OracleStatementWrapper.java:298)
	at oracle.r2dbc.impl.OracleStatementImpl$JdbcStatement.lambda$getResults$4(OracleStatementImpl.java:1053)
	at oracle.r2dbc.impl.AsyncLock.lambda$get$2(AsyncLock.java:161)
	at oracle.r2dbc.impl.AsyncLock.unlock(AsyncLock.java:122)
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.terminate(AsyncLock.java:510)
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:496)
	at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onComplete(FlowAdapters.java:228)
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitComplete(CompletionStageUtil.java:681)
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitItems(CompletionStageUtil.java:628)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:399)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

How I execute the query:

return Mono.usingWhen(
        connectionPool.create(),
        connection -> Mono.from(connection.createStatement(QUERY)
                .bind(0, destinationState)
                .bind(1, subsidiaryId)
                .bind(2, itemId)
                .execute()
        ),
        Connection::close,
        ((connection, throwable) -> connection.close()),
        Connection::close
)
        .flatMapMany(it -> it.map(mapper))
        .next();

How I create the connection pool:

public ConnectionPool connectionFactory() {
    return new ConnectionPool(ConnectionPoolConfiguration
            .builder()
            .connectionFactory(ConnectionFactories.get(
                    ConnectionFactoryOptions
                            .builder()
                            .from(ConnectionFactoryOptions.parse(url))
                            .option(ConnectionFactoryOptions.USER, user)
                            .option(ConnectionFactoryOptions.PASSWORD, password)
                            .option(ConnectionFactoryOptions.DRIVER, DRIVER)
                            .option(Option.valueOf("applicationName"), "catalog-service-app")
                            .build()
                    )
            )
            .initialSize(INITIAL_CONNECTIONS)
            .maxSize(maxConnections)
            .maxIdleTime(Duration.ofSeconds(maxIdleTime))
            .validationQuery(VALIDATION_QUERY)
            .build()
    );
}

I thought that I would be a problem with dependency versions, but, I checked the dependencies and, I am using the correct ones.
Please, tell me where I am making a mistake

Dependencies:
image

Question: DELETE command with RETURNING INTO

Quote from the readme: "Returning generated values is only supported for INSERT and UPDATE commands when a RETURNING INTO clause can be appended to the end of that command."

Is there any plan for implementing this feature for DELETE commands? To support such statement:
"DELETE cars RETURNING listagg(distinct cars.registration_number, ',') INTO :registration_numbers;"

spring r2dbc run with error “LOCK_WAIT_TIMEOUT”

error:
Caused by: java.lang.NoSuchFieldError: LOCK_WAIT_TIMEOUT
at oracle.r2dbc.impl.OracleConnectionFactoryImpl.(OracleConnectionFactoryImpl.java:186) ~[oracle-r2dbc-1.0.0.jar:1.0.0]
at oracle.r2dbc.impl.OracleConnectionFactoryProviderImpl.create(OracleConnectionFactoryProviderImpl.java:99) ~[oracle-r2dbc-1.0.0.jar:1.0.0]
at io.r2dbc.spi.ConnectionFactories.find(ConnectionFactories.java:112) ~[r2dbc-spi-0.8.3.RELEASE.jar:na]
at io.r2dbc.spi.ConnectionFactories.get(ConnectionFactories.java:142) ~[r2dbc-spi-0.8.3.RELEASE.jar:na]
at org.springframework.boot.autoconfigure.r2dbc.ConnectionFactoryBuilder.build(ConnectionFactoryBuilder.java:125) ~[spring-boot-autoconfigure-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.boot.autoconfigure.r2dbc.ConnectionFactoryConfigurations.createConnectionFactory(ConnectionFactoryConfigurations.java:56) ~[spring-boot-autoconfigure-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at org.springframework.boot.autoconfigure.r2dbc.ConnectionFactoryConfigurations$Pool.connectionFactory(ConnectionFactoryConfigurations.java:68) ~[spring-boot-autoconfigure-2.3.7.RELEASE.jar:2.3.7.RELEASE]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:577) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.2.12.RELEASE.jar:5.2.12.RELEASE]
... 68 common frames omitted

config:

应用名称

spring.application.name=test-r2dbc
#spring.r2dbc.url=r2dbc:oracle:thin://10.19.1.208:1521:szwz
spring.r2dbc.url=r2dbc:oracle:thin://localhost:1521:orcl
#spring.r2dbc.url=r2dbc:mysql://localhost:3306/test?ssl=false
spring.r2dbc.username=***
spring.r2dbc.password=***
spring.r2dbc.pool.max-size=100
spring.r2dbc.pool.max-idle-time=2m
spring.r2dbc.pool.initial-size=100
spring.r2dbc.pool.validation-query=select 1

Mapping Blob/Clob to ByteBuffer/String is not supported

Not supporting BLOB/CLOB to ByteBuffer/String mapping is a known limitation of Oracle R2DBC. Only io.r2dbc.spi.Blob/Clob mapping is supported. This is noted in the documentation, but the reasoning is not explained in depth. So I’d like to share the full extent of my thoughts on this, and see if we can get to a better solution.

When a query returns a LOB value, Oracle Database returns a locator for that value. For the purpose of this discussion, we can think of the locator as a pointer to the actual value. To get the actual value into a buffer, a database driver makes another database call requesting to read the value which a locator points to.
Although the LOB read requires a database call, Oracle R2DBC could still support the ByteBuffer mapping without Row.get(int/String, ByteBuffer.class) having to make a blocking database call. Before emitting the Row to the mapping function, the driver could execute non-blocking database calls to read the LOB content into a buffer. After the content had been buffered, then the Row can be input to the mapping function and the ByteBuffer would be ready to go.
Of course, if the LOB exceeded 2GB, then it would not fit into a ByteBuffer and the driver would need to handle that. But we can ignore this case for the moment, as it doesn't completely prevent Oracle R2DBC from supporting the ByteBuffer mapping.

So, pre-buffering the LOB content is one option to consider. However, this approach seems to devalue the case where user code wants to use io.r2dbc.spi.Blob. Rather than have Blob.stream() respond to backpressure from a Subscriber, the stream() implementation has decided to allocate memory for the entire content of the LOB before a Subscriber has even subscribed.

On the other hand, if Oracle R2DBC only supports the io.r2dbc.spi.Blob mapping, and user code want to map that into a ByteBuffer, it still has the freedom to implement that mapping. If the user code knows that the BLOB value won’t exhaust memory, or exceed the 2GB ByteBuffer capacity, then it can map the BLOB into a ByteBuffer like this:

Flux.usingWhen(ConnectionFactories.get(
  "r2dbc:oracle://test:test@localhost:1521/xepdb1")
  .create(),
  connection ->
    Flux.from(connection.createStatement("DROP TABLE BlobTest")
      .execute())
      .flatMap(Result::getRowsUpdated)
      .onErrorResume(throwable ->
        // If the table doesn't exist, don't emit ORA-00942
        (throwable instanceof R2dbcException)
        && ((R2dbcException)throwable).getErrorCode() == 942,
        throwable -> Mono.empty())
      .thenMany(connection.createStatement(
        "CREATE TABLE BlobTest (value BLOB)")
        .execute())
      .flatMap(Result::getRowsUpdated)
      .thenMany(connection.createStatement(
        "INSERT INTO BlobTest VALUES(?)")
        .bind(0, ByteBuffer.wrap(new byte[128_000]))
        .execute())
        .flatMap(Result::getRowsUpdated)
      .thenMany(connection.createStatement(
        "SELECT value FROM BlobTest")
        .execute())
      .flatMap(result ->
        result.map((row, metadata) -> row.get(0, Blob.class)))
      .flatMap(blob ->
        Flux.from(blob.stream())
          .reduce(ByteBuffer.allocate(8192),
            (buffer, next) -> {
              // Ensure capacity for all remaining bytes in the next buffer.
              if (buffer.remaining() < next.remaining()) {
                ByteBuffer newBuffer = ByteBuffer.allocate(Math.max(
                    buffer.capacity()
                      + (next.remaining() - buffer.remaining()),
                    buffer.capacity() << 1));
                buffer = newBuffer.put(buffer.flip());
              }

              buffer.put(next);
              return buffer;
            })),
  Connection::close)
  .toStream()
  .map(ByteBuffer::flip)
  .forEach(System.out::println);

As shown above, the ability for user code to implement a Blob to ByteBuffer mapping is what ultimately lead to the decision for Oracle R2DBC to only support Blob mapping. With Blob, user code still has the option to map it into a ByteBuffer if wants to, but user code can also choose to process the Blob as a stream of smaller buffers if it wants to do that instead.

So far, we’ve only considered solutions that represent two extremes. Either: A) Buffer everything, or B) Buffer nothing. Option C might look like this:

  • Let X be a reasonable buffer size, one that should not exhaust the memory resources of modern hardware when there are N instances of X, and N is a reasonable upper bound for the number of database connections.
  • The Oracle driver buffers X bytes from a BLOB into memory before emitting a Row to the mapping function.
  • The mapping function calls Row.get(…, T), where T may be a ByteBuffer or Blob
    • If T is Blob.class, then return a Blob instance with X bytes pre-buffered.
    • If T is ByteBuffer.class, then let L be the length of the BLOB.
      • If L > 2GB: Throw an exception. The BLOB won’t fit into a ByteBuffer
      • If L > X: Execute a blocking database call to read the remaining length of the BLOB into the buffer.
      • Else: Return the fully-buffered BLOB.

I find the solution described above to be problematic because the cases where errors and blocking database calls occur seem like like pitfalls that are easy to miss. It seems too likely that a system would be verified by tests that miss the case where a LOB exceeds 2GB, and then fail in production when the >2GB case occurs. And for blocking database calls, that’s really hard to detect unless you have something like Java Mission control to measure socket read time.
Although having to implement a ByteBuffer mapping with something like the reduce operator above puts some burden on user code, it seemed like a better alternative than to introduce the pitfalls I’ve described.

Of course, it would be excellent if Oracle R2DBC could support the ByteBuffer mapping. I’m happy to discuss new solutions with anyone that wants to explore this further.

Support oracle.jdbc.timezoneAsRegion driver parameter

When using the docker image provided at https://registry.hub.docker.com/r/gvenzl/oracle-xe with Testcontainers, specifically the gvenzl/oracle-xe:11-slim image, connections fail if the JVM's timezone is set to Etc/UTC.

Setting the property oracle.jdbc.timezoneAsRegion as a system property, as described here, https://gist.github.com/jarek-przygodzki/cbea3cedae3aef2bbbe0ff6b057e8321 is a fix.

It would be nice if this property were supported directly by this driver, so it could be scoped to the connection.

How do you set the current schema?

In the samples you provide it looks like it commands are operating off the default schema (or it's set in a way I can't divine).

Can you provide an example on how to change the schema?

Spring + Oracle r2dbc and r2dbc pool

@neibarbosa Thanks for sharing this. Providing a code sample and details about how to recreate the issue is very much appreciated!

I think that you are testing with the 0.1.0 version of Oracle R2DBC. Is this correct?
The 0.1.0 release did not integrate with r2dbc-pool for the reason you have noticed: The Publisher returned by ConnectionFactory.create() did not support multiple subscribers. The r2dbc-pool project expects the Publisher to support multiple subscribers.

In the 0.2.0 release, we fixed this so that multiple subscribers are now supported. With this fix, Oracle R2DBC should now work with r2dbc-pool. The pull request for that fix is here: #24

You might want to try testing with oracle-r2dbc:0.2.0 and r2dbc-pool:0.9.0.M1. These versions both support the 0.9.0.M1 R2DBC SPI.

Originally posted by @Michael-A-McMahon in #29 (comment)

I understand that I have to use oracle-r2dbc:0.2.0 and r2dbc-pool:0.9.0.M1 together. However I am using Spring and according to their README.md I cannot use oracle-r2dbc:0.2.0, but oracle-r2dbc:0.1.0. What version should I use to for these dependencies to use them with Spring? I've created a project for testing https://github.com/7Mircea/R2dbc_Oracle_Spring.

Class Cast Exceptio when I call subscribe method on a Mono

Sample Code:

Exception takes place in the highlighted code in bold below

testSubscriptionMono.flatMap(testSubscription -> {
testSubscription.setMembershipStatus("ENROLLED");
return repositoryService.saveTestSubscription(testSubscription);
}).subscribe(value -> System.out.println("RECEIVED " + value),
error -> error.printStackTrace());

Depdendencies:

image

Error Logs:

java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.lang.Integer (java.lang.Long and java.lang.Integer are in module java.base of loader 'bootstrap')
at java.base/java.util.stream.Collectors.lambda$summingInt$19(Collectors.java:673)
at reactor.core.publisher.MonoStreamCollector$StreamCollectorSubscriber.onNext(MonoStreamCollector.java:132)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.onNext(FluxConcatArray.java:364)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:360)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.request(FluxConcatArray.java:461)
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:964)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)
at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.onSubscribe(FluxConcatArray.java:350)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:265)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxConcatArray$ConcatArrayDelayErrorSubscriber.onComplete(FluxConcatArray.java:443)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:73)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:201)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:189)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172)
at oracle.r2dbc.impl.AsyncLock.lambda$get$2(AsyncLock.java:167)
at oracle.r2dbc.impl.AsyncLock.unlock(AsyncLock.java:125)
at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.terminate(AsyncLock.java:516)
at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:502)
at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onComplete(FlowAdapters.java:221)
at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitComplete(CompletionStageUtil.java:805)
at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitItems(CompletionStageUtil.java:752)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

R2DBC 0.1.0 : NoSuchMethodError: Flow$Publisher OracleConnectionBuilder.buildConnectionPublisherOracle()

I come to you because I am stuck with my Jhipster Spring Boot application,
which uses the oracle-r2dbc dependency, when I try to uses my Oracle dependency

  • In my case, I have problem when using dependency oracle-r2dbc version 0.1.0 (or even 0.2.0).
  • If I used version 0.3.0 or 0.4.0, I couldn't even start my application, having an error LOCK_WAIT_TIMEOUT at startup

My JHipster application also contains the dependency spring-boot-starter-data-r2dbc version 2.6.3 which contains spring-data-r2dbc v1.4.1, r2dbc-spi v0.8.6.RELEASE (which contains oracle-r2dbc version 0.1.0) and r2dbc-pool v0.8.8.RELEASE.

My Oracle Database version is Oracle Database 21c Express Edition Release 21.0.0.0.0 - Production

In my case I have no problem at startup time, liquibase scripts work well, but when calling database from R2dbc api.

Could you help me find a solution on how I could use r2dbc with my Oracle Database ?

Thanks a lot in advance

The following sections give more details about my application, my configuration files, the code that provoke the error and the error stack trace.

Short description of my application

I have multiple maven profiles. One is called "dev" and it uses embeded H2 database. Everything works fine with it. I created a new maven Profile called 'local' in order to connect to my local Oracle Database. No problem at startutp and new tables located in liquibase changelog files are properly created with data in my Oracle database without any errors. Oracle version: Oracle Database 21c Express Edition Release 21.0.0.0.0 - Production

But If i call one of my apis that need to execute some queries then an error is raised with Oracle. Although it works fine when using h2 database. Do you know how I could resolve this problem ?

Below are listed the details of the error and some relevant parts of my configuration files.

Thanks a lot in advance

The JAVA code that provokes the error

In my controller class

@GetMapping("/authorities")
public Mono<List<String>> getAuthorities() {
    return userService.getAuthorities().collectList();
}

In my service class

@Transactional(readOnly = true)
public Flux<String> getAuthorities() {
    return authorityRepository.findAll().map(Authority::getName);
}

The stacktrace of the error

    java.lang.NoSuchMethodError: 'java.util.concurrent.Flow$Publisher oracle.jdbc.OracleConnectionBuilder.buildConnectionPublisherOracle()'
        at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.lambda$publishConnection$8(OracleReactiveJdbcAdapter.java:643)
        at oracle.r2dbc.impl.OracleR2dbcExceptions.getOrHandleSQLException(OracleR2dbcExceptions.java:267)
        at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.lambda$deferOnce$23(OracleReactiveJdbcAdapter.java:1060)
        at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
        at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
        at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
        at reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:423)
        at reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:558)
        at reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:268)
        at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432)

Here are some configurations files that might be important:

The pom.xml file

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-r2dbc</artifactId> <!--  version 2.6.3   -->
</dependency>

...

  <profile>
        <id>dev</id>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>             
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>com.h2database</groupId>
                <artifactId>h2</artifactId>
            </dependency>
            <dependency>
                <groupId>io.r2dbc</groupId>
                <artifactId>r2dbc-h2</artifactId>
            </dependency>
        </dependencies>
    </profile>
<profile>
    <id>local</id>           
    <dependencies> 
    <!-- database oracles -->    
    <dependency>
        <groupId>com.oracle.database.r2dbc</groupId>
        <artifactId>oracle-r2dbc</artifactId> <!--  version 0.1.0  -->
    </dependency>           
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-clean-plugin</artifactId>
                <configuration>
                    <filesets>
                        <fileset>
                            <directory>target/classes/static/</directory>
                        </fileset>
                    </filesets>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>build-info</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>io.github.git-commit-id</groupId>
                <artifactId>git-commit-id-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <properties>
        <!-- default Spring profiles -->
        <spring.profiles.active>local${profile.api-docs}${profile.tls}</spring.profiles.active>
    </properties>
</profile> 

The file application-local.yml

  liquibase:
    contexts: local, faker
    url: jdbc:oracle:thin:@localhost:1521/xe    
  mail:
    host: localhost
    port: 25
    username:
    password:
  r2dbc:
    url: r2dbc:oracle:thin://localhost:1521/xe
    username: MY_CREDIT    
    password: MY_CREDIT    

Other attempts

I tried to change the r2dbc url to r2dbc:oracle:thin://localhost:1521:xe (: at the end) but it doesn't change anything. I also tried to add other dependencies in the pom like the one below, but no differences:

    <dependency>
        <groupId>com.oracle.database.jdbc</groupId>
        <artifactId>ojdbc8</artifactId>
        <version>21.1.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.oracle.database.ha</groupId>
        <artifactId>ons</artifactId>
        <version>21.1.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.oracle.database.jdbc</groupId>
        <artifactId>ucp</artifactId>
        <version>21.1.0.0</version>
    </dependency>

Error when using r2dbc-pool: this publisher does not support multiple subscribers

I'm trying to use this driver with spring-boot-data-r2dbc (v2.4.4), but when I configure the connection factory to use the connection pool like this:

return ConnectionFactories.get(ConnectionFactoryOptions.builder()
                .option(ConnectionFactoryOptions.DRIVER, "pool")
                .option(ConnectionFactoryOptions.PROTOCOL, "oracle")
                .option(ConnectionFactoryOptions.HOST, "127.0.0.1")
                .option(ConnectionFactoryOptions.PORT, 1521)
                .option(ConnectionFactoryOptions.DATABASE, "XE")
                .option(ConnectionFactoryOptions.USER, "APP_USER")
                .option(ConnectionFactoryOptions.PASSWORD, "********************")
                .option(Option.valueOf(OracleConnection.CONNECTION_PROPERTY_FAN_ENABLED), "false")
                .build());

With this settings:

application.yaml

spring:
  r2dbc:
    pool:
      initialSize: 10
      maxSize: 50

Around 80% of my application requests ends with the following error:

java.lang.IllegalStateException: This publisher does not support multiple subscribers.
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher.rejectSubscriber(OracleDataSource.java:2676) ~[ojdbc11-21.1.0.0.jar:21.1.0.0.0]
	at oracle.jdbc.datasource.impl.OracleDataSource$ConnectionPublisher.subscribe(OracleDataSource.java:2655) ~[ojdbc11-21.1.0.0.jar:21.1.0.0.0]
	at org.reactivestreams.FlowAdapters$ReactivePublisherFromFlow.subscribe(FlowAdapters.java:355) ~[reactive-streams-1.0.3.jar:na]
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.lambda$deferOnce$22(OracleReactiveJdbcAdapter.java:1071) ~[oracle-r2dbc-0.1.0.jar:0.1.0]
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter$$Lambda$1775/00000000C46540E0.accept(Unknown Source) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2172) ~[na:na]
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.lambda$deferOnce$23(OracleReactiveJdbcAdapter.java:1070) ~[oracle-r2dbc-0.1.0.jar:0.1.0]
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter$$Lambda$595/0000000039BC68E0.subscribe(Unknown Source) ~[na:na]
	at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4099) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8185) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8358) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8155) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8079) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:404) ~[reactor-pool-0.2.3.jar:0.2.3]
	at reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:521) ~[reactor-pool-0.2.3.jar:0.2.3]
	at reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:254) ~[reactor-pool-0.2.3.jar:0.2.3]
	at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:382) ~[reactor-pool-0.2.3.jar:0.2.3]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:137) ~[reactor-core-3.4.4.jar:3.4.4]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:137) ~[reactor-core-3.4.4.jar:3.4.4]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:74) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) ~[reactor-core-3.4.4.jar:3.4.4]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:170) ~[reactor-core-3.4.4.jar:3.4.4]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:67) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:170) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:632) ~[reactor-pool-0.2.3.jar:0.2.3]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:116) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:49) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4084) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.4.jar:3.4.4]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.4.jar:3.4.4]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.4.jar:3.4.4]
	at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95) ~[spring-cloud-sleuth-instrumentation-3.0.0.jar:3.0.0]
	at reactor.core.publisher.Operators.error(Operators.java:197) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4099) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:103) ~[reactor-core-3.4.4.jar:3.4.4]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4099) ~[reactor-core-3.4.4.jar:3.4.4]
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne(Await.kt:137) ~[kotlinx-coroutines-reactive-1.4.3.jar:na]
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne$default(Await.kt:135) ~[kotlinx-coroutines-reactive-1.4.3.jar:na]
	at kotlinx.coroutines.reactive.AwaitKt.awaitFirst(Await.kt:26) ~[kotlinx-coroutines-reactive-1.4.3.jar:na]

Without the connection pool, everything runs fine, but with a low throughput. Am I missing any configuration or is this a known limitation?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.