Giter Site home page Giter Site logo

rxjava-jdbc's Introduction

rxjava-jdbc



Maven Central

Efficient execution, concise code, and functional composition of database calls using JDBC and RxJava Observable.

Status: Released to Maven Central

See also rxjava2-jdbc for RxJava 2.x with non-blocking connection pools!

Release Notes

Features

  • Functionally compose database queries run sequentially or in parallel
  • Queries may be only partially run or indeed never run due to subscription cancellations thus improving efficiency
  • Concise code
  • Queries can depend on completion of other Observables and can be supplied parameters through Observables.
  • Method chaining just leads the way (once you are on top of the RxJava api of course!)
  • All the RxJava goodness!
  • Automatically maps query result rows into typed tuples or your own classes
  • CLOB and BLOB handling is simplified greatly

Maven site reports are here including javadoc.

Table of Contents

Todo

  • Callable statements

Build instructions

git clone https://github.com/davidmoten/rxjava-jdbc.git
cd rxjava-jdbc
mvn clean install

Getting started

Include this maven dependency in your pom (available in Maven Central):

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-jdbc</artifactId>
    <version>VERSION_HERE</version>
</dependency>

After using RxJava on a work project and being very impressed with it (even without Java 8 lambdas!), I wondered what it could offer for JDBC usage. The answer is lots!

Here's a simple example:

Database db = Database.from(url);
List<String> names = db
		.select("select name from person where name > ? order by name")
		.parameter("ALEX")
		.getAs(String.class)
		.toList().toBlocking().single();
System.out.println(names);

output:

[FRED, JOSEPH, MARMADUKE]

Without using rxjava-jdbc:

String sql = "select name from person where name > ? order by name";
try (Connection con = nextConnection();
     PreparedStatement ps = con.prepareStatement(sql);) {
    ps.setObject(1, "ALEX");
    List<String> list = new ArrayList<String>();
    try (ResultSet rs = ps.executeQuery()) {
        while (rs.next()) {
            list.add(rs.getString(1));
        }
    }
    System.out.println(list);
} catch (SQLException e) {
    throw new RuntimeException(e);
}

Query types

The Database.select() method is used for

  • SQL select queries.

The Database.update() method is used for

  • update
  • insert
  • delete
  • DDL (like create table, etc)

Examples of all of the above methods are found in the sections below.

Functional composition of JDBC calls

Here's an example, wonderfully brief compared to normal JDBC usage:

import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;

// use composition to find the first person alphabetically with
// a score less than the person with the last name alphabetically
// whose name is not XAVIER. Two threads and connections will be used.

Database db = new Database(connectionProvider);
Observable<Integer> score = db
		.select("select score from person where name <> ? order by name")
		.parameter("XAVIER")
		.getAs(Integer.class)
		.last();
String name = db
		.select("select name from person where score < ? order by name")
		.parameters(score)
		.getAs(String.class)
		.first()
		.toBlocking().single();
assertEquals("FRED", name);

or alternatively using the Observable.compose() method to chain everything in one command:

String name = db
    .select("select score from person where name <> ? order by name")
    .parameter("XAVIER")
    .getAs(Integer.class)
    .last()
    .compose(db.select("select name from person where score < ? order by name")
            .parameterTransformer()
            .getAs(String.class))
    .first()
    .toBlocking().single();

About toBlocking

You'll see toBlocking() used in the examples in this page and in the unit tests but in your application code you should try to avoid using it. The most benefit from the reactive style is obtained by not leaving the monad. That is, stay in Observable land and make the most of it. Chain everything together and leave toBlocking to an endpoint or better still just subscribe with a Subscriber.

Dependencies

You can setup chains of dependencies that will determine the order of running of queries.

To indicate that a query cannot be run before one or more other Observables have been completed use the dependsOn() method. Here's an example:

Observable<Integer> insert = db
		.update("insert into person(name,score) values(?,?)")
		.parameters("JOHN", 45)
		.count()
		.map(Util.<Integer> delay(500));
int count = db
		.select("select name from person")
		.dependsOn(insert)
		.get()
		.count()
		.toBlocking().single();
assertEquals(4, count);

Note that when you pass the output of a query as a parameter to another query there is an implicit dependency established.

Mixing explicit and Observable parameters

Example:

String name= db
	.select("select name from person where name > ?  and score < ? order by name")
	.parameter("BARRY")
	.parameters(Observable.just(100))
	.getAs(String.class)
	.first()
	.toBlocking().single();
assertEquals("FRED",name);

Passing multiple parameter sets to a query

Given a sequence of parameters, each chunk of parameters will be run with the query and the results appended. In the example below there is only one parameter in the sql statement yet two parameters are specified. This causes the statement to be run twice.

List<Integer> list = 
	db.select("select score from person where name=?")
	    .parameter("FRED").parameter("JOSEPH")
		.getAs(Integer.class).toList().toBlocking().single();
assertEquals(Arrays.asList(21,34),list);

Named parameters

Examples:

Observable<String> names = db
    .select("select name from person where score >= :min and score <=:max")
    .parameter("min", 24)
    .parameter("max", 26)
    .getAs(String.class);

Using a map of parameters:

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("min", 24);
map.put("max", 26);
Observable<String> names = db
    .select("select name from person where score >= :min and score <=:max")
    .parameters(map)
    .getAs(String.class);

Using an Observable of maps:

Observable<String> names = db
    .select("select name from person where score >= :min and score <=:max")
    .parameters(Observable.just(map1, map2))
    .getAs(String.class);

Processing a ResultSet

Many operators in rxjava process items pushed to them asynchronously. Given this it is important that ResultSet query results are processed before being emitted to a consuming operator. This means that the select query needs to be passed a function that converts a ResultSet to a result that does not depend on an open java.sql.Connection. Use the get(), getAs(), getTuple?(), and autoMap() methods to specify this function as below.

Observable<Integer> scores = db.select("select score from person where name=?")
	    .parameter("FRED")
		.getAs(Integer.class);

Mapping

A common requirement is to map the rows of a ResultSet to an object. There are two main options: explicit mapping and automap.

Explicit mapping

Using get you can map the ResultSet as you wish:

db.select("select name, score from person")
  .get( rs -> new Person(rs.getString(1), rs.getInt(2)));

Automap

automap does more for you than explicit mapping. You can provide just an annotated interface and objects will be created that implement that interface and types will be converted for you (See Auto mappings section below).

There is some reflection overhead with using auto mapping. Use your own benchmarks to determine if its important to you (the reflection overhead may not be significant compared to the network latencies involved in database calls).

The autoMap method maps result set rows to instances of the class you nominate.

If you nominate an interface then dynamic proxies (a java reflection feature) are used to build instances.

If you nominate a concrete class then the columns of the result set are mapped to parameters in the constructor (again using reflection).

Automap using an interface

Create an annotated interface (introduced in rxjava-jdbc 0.5.8):

public interface Person {

    @Column("name")
    String name();

    @Column("score")
    int score();
}

Then run

Observable<Person> persons = db
                 .select("select name, score from person order by name")
                 .autoMap(Person.class);

Easy eh!

An alternative is to annotate the interface with the indexes of the columns in the result set row:

public interface Person {

    @Index(1)
    String name();

    @Index(2)
    int score();
}

Camel cased method names will be converted to underscore by default (since 0.5.11):

public interface Address {

    @Column // maps to address_id 
    int addressId();

    @Column // maps to full_address
    String fullAddress();
}

You can also specify the sql to be run in the annotation:

@Query("select name, score from person order by name")
public interface Person {

    @Column
    String name();

    @Column
    int score();
}

Then run like this:

Observable<Person> persons = db
                 .select().autoMap(Person.class);

Automap using a concrete class

Given this class:

static class Person {
	private final String name;
	private final double score;
	private final Long dateOfBirth;
	private final Long registered;

	Person(String name, Double score, Long dateOfBirth,
			Long registered) {
			...

Then run

Observable<Person> persons = db
				.select("select name,score,dob,registered from person order by name")
				.autoMap(Person.class);

The main requirement is that the number of columns in the select statement must match the number of columns in a constructor of Person and that the column types can be automatically mapped to the types in the constructor.

Auto mappings

The automatic mappings below of objects are used in the autoMap() method and for typed getAs() calls.

  • java.sql.Date,java.sql.Time,java.sql.Timestamp <==> java.util.Date
  • java.sql.Date,java.sql.Time,java.sql.Timestamp ==> java.lang.Long
  • java.sql.Blob <==> java.io.InputStream, byte[]
  • java.sql.Clob <==> java.io.Reader, String
  • java.math.BigInteger ==> Long, Integer, Decimal, Float, Short, java.math.BigDecimal
  • java.math.BigDecimal ==> Long, Integer, Decimal, Float, Short, java.math.BigInteger

Note that automappings do not occur to primitives so use Long instead of long.

Tuples

Typed tuples can be returned in an Observable:

Tuple2

Tuple2<String, Integer> tuple = db
		.select("select name,score from person where name >? order by name")
		.parameter("ALEX").create()
		.getAs(String.class, Integer.class).last()
		.toBlocking().single();
assertEquals("MARMADUKE", tuple.value1());
assertEquals(25, (int) tuple.value2());

Similarly for Tuple3, Tuple4, Tuple5, Tuple6, Tuple7, and finally

TupleN

TupleN<String> tuple = db
		.select("select name, lower(name) from person order by name")
		.create()
		.getTupleN(String.class).first()
		.toBlocking().single();
assertEquals("FRED", tuple.values().get(0));
assertEquals("fred", tuple.values().get(1));

Returning generated keys

If you insert into a table that say in h2 is of type auto_increment then you don't need to specify a value but you may want to know what value was inserted in the generated key field.

Given a table like this

create table note(
    id bigint auto_increment primary key,
    text varchar(255)
)

This code inserts two rows into the note table and returns the two generated keys:

Observable<Integer> keys = 
    db.update("insert into note(text) values(?)")
      .parameter("hello", "there")
      .returnGeneratedKeys()
      .getAs(Integer.class);

The returnGeneratedKeys method also supports returning multiple keys per row so the builder offers methods just like select to do explicit mapping or auto mapping.

Large objects support

Blob and Clobs are straightforward to handle.

Insert a Clob

Here's how to insert a String value into a Clob (document column below is of type CLOB):

String document = ...
Observable<Integer> count = db
		.update("insert into person_clob(name,document) values(?,?)")
		.parameter("FRED")
		.parameter(Database.toSentinelIfNull(document)).count();

(Note the use of the Database.toSentinelIfNull(String) method to handle the null case correctly)

or using a java.io.Reader:

Reader reader = ...;
Observable<Integer> count = db
		.update("insert into person_clob(name,document) values(?,?)")
		.parameter("FRED")
		.parameter(reader).count();

Insert a Null Clob

This requires either a special call (parameterClob(String)) to identify the parameter as a CLOB:

Observable<Integer> count = db
		.update("insert into person_clob(name,document) values(?,?)")
		.parameter("FRED")
		.parameterClob(null).count();

or use the null Sentinel object for Clobs:

Observable<Integer> count = db
		.update("insert into person_clob(name,document) values(?,?)")
		.parameter("FRED")
		.parameter(Database.NULL_CLOB).count();

or wrap the String parameter with Database.toSentinelIfNull(String) as above in the Insert a Clob section.

Read a Clob

Observable<String> document = db.select("select document from person_clob")
				.getAs(String.class);

or

Observable<Reader> document = db.select("select document from person_clob")
				.getAs(Reader.class);

Insert a Blob

Similarly for Blobs (document column below is of type BLOB):

byte[] bytes = ...
Observable<Integer> count = db
		.update("insert into person_blob(name,document) values(?,?)")
		.parameter("FRED")
		.parameter(Database.toSentinelIfNull(bytes)).count();

Insert a Null Blob

This requires either a special call (parameterBlob(String) to identify the parameter as a CLOB:

Observable<Integer> count = db
		.update("insert into person_blob(name,document) values(?,?)")
		.parameter("FRED")
		.parameterBlob(null).count();

or use the null Sentinel object for Blobs:

Observable<Integer> count = db
		.update("insert into person_clob(name,document) values(?,?)")
		.parameter("FRED")
		.parameter(Database.NULL_BLOB).count();

or wrap the byte[] parameter with Database.toSentinelIfNull(byte[]) as above in the Insert a Blob section.

Read a Blob

Observable<byte[]> document = db.select("select document from person_clob")
				.getAs(byte[].class);

or

Observable<InputStream> document = db.select("select document from person_clob")
				.getAs(InputStream.class);

Compose

Using the Observable.compose() method you can perform multiple queries without breaking method chaining. Observable.compose() requires a Transformer parameter which are available via

  • db.select(sql).parameterTransformer().getXXX()
  • db.select(sql).parameterListTransformer().getXXX()
  • db.select(sql).dependsOnTransformer().getXXX()
  • db.update(sql).parameterTransformer()
  • db.update(sql).parameterListTransformer()
  • db.update(sql).dependsOnTransformer()

Example:

Observable<Integer> score = Observable
    // parameters for coming update
    .just(4, "FRED")
    // update Fred's score to 4
    .compose(db.update("update person set score=? where name=?")
            //parameters are pushed
            .parameterTransformer())
    // update everyone with score of 4 to 14
    .compose(db.update("update person set score=? where score=?")
            .parameters(14, 4)
            //wait for completion of previous observable
            .dependsOnTransformer())
    // get Fred's score
    .compose(db.select("select score from person where name=?")
            .parameters("FRED")
            //wait for completion of previous observable
            .dependsOnTransformer()
			.getAs(Integer.class));

Note that conditional evaluation of a query is obtained using the parameterTransformer() method (no parameters means no query run) whereas using dependsOnTransformer() just waits for the dependency to complete and ignores how many items the dependency emits.

If the query does not require parameters you can push it an empty list and use the parameterListTransformer() to force execution.

Example:

Observable<Integer> rowsAffected = Observable
    //generate two integers
    .range(1,2)
    //replace the integers with empty observables
    .map(toEmpty())
    //execute the update twice with an empty list
    .compose(db.update("update person set score = score + 1")
            .parameterListTransformer())
    // flatten
    .compose(RxUtil.<Integer> flatten())
    // total the affected records
    .compose(SUM_INTEGER);

Transactions

When you want a statement to participate in a transaction then either it should

  • depend on db.beginTransaction()
  • be passed parameters or dependencies through db.beginTransactionOnNext()

Transactions as dependency

Observable<Boolean> begin = db.beginTransaction();
Observable<Integer> updateCount = db
    // set everyones score to 99
    .update("update person set score=?")
    // is within transaction
    .dependsOn(begin)
    // new score
    .parameter(99)
    // execute
    .count();
Observable<Boolean> commit = db.commit(updateCount);
long count = db
    .select("select count(*) from person where score=?")
	// set score
	.parameter(99)
	// depends on
	.dependsOn(commit)
	// return as Long
	.getAs(Long.class)
	// log
	.doOnEach(RxUtil.log())
	// get answer
	.toBlocking().single();
assertEquals(3, count);

onNext Transactions

List<Integer> mins = Observable
    // do 3 times
    .just(11, 12, 13)
    // begin transaction for each item
    .compose(db.beginTransactionOnNext_())
    // update all scores to the item
    .compose(db.update("update person set score=?").parameterTransformer())
    // to empty parameter list
    .map(toEmpty())
    // increase score
    .compose(db.update("update person set score=score + 5").parameterListTransformer())
    //only expect one result so can flatten
    .compose(RxUtil.<Integer>flatten())
    // commit transaction
    .compose(db.commitOnNext_())
    // to empty lists
    .map(toEmpty())
    // return count
    .compose(db.select("select min(score) from person").parameterListTransformer()
            .getAs(Integer.class))
    // list the results
    .toList()
    // block and get
    .toBlocking().single();
assertEquals(Arrays.asList(16, 17, 18), mins);

Note that for each commit* method there is an corresponding rollback method as well.

Asynchronous queries

Unless run within a transaction all queries are synchronous by default. However, if you request an asynchronous version of the database using Database.asynchronous() or if you use asynchronous Transformers then watch out because this means that something like the code below could produce unpredictable results:

Database adb = db.asynchronous();
Observable
    .just(1, 2, 3)
    .compose(adb.update("update person set score = ?")
            .parameterTransformer());

After running this code you have no guarantee that the update person set score=1 ran before the update person set score=2. To run those queries synchronously either use a transaction:

Database adb = db.asynchronous();
Observable
   .just(1, 2, 3)
   .compose(adb.update("update person set score = ?")
           .dependsOn(db.beginTransaction())
           .parameterTransformer())
    .compose(adb.commitOnComplete_());

or use the default version of the Database object that schedules queries using Schedulers.trampoline().

Observable.just(1, 2, 3)
          .compose(db.update("update person set score = ?")
                  .parameterTransformer());

Backpressure

Database.select supports reactive pull backpressure as introduced in RxJava 0.20.0. This means that the pushing of items from the results of a query can be optionally slowed down by the operators downstream to assist in preventing out of memory exceptions or thread starvation.

Logging

Logging is handled by slf4j which bridges to the logging framework of your choice. Add the dependency for your logging framework as a maven dependency and you are sorted. See the test scoped log4j example in rxjava-jdbc/pom.xml.

Database Connection Pools

Include the dependency below:

<dependency>
    <groupId>com.zaxxer</groupId>
    <artifactId>HikariCP-java6</artifactId>
    <version>2.3.2</version>
</dependency>

and you can use a Hikari database connection pool like so:

Database db = Database.builder().url(url).pool(minPoolSize,maxPoolSize).build();

Once finished with a Database that has used a connection pool you should call

db.close();

This will close the connection pool and release its resources.

Using a custom connection pool

If Hikari doesn't suit you or you have container imposed constraints this is how you can use a different connection pool.

Write an implmentation of the ConnectionProvider interface (two methods, getConnection() and close()) and use it like so:

ConnectionProvider cp = new CustomConnectionProvider();
Database db = Database.builder().connectionProvider(cp).build();

This method could be used to supply a JNDI datasource for example.

Use a single Connection

A Database can be instantiated from a single java.sql.Connection which will be used for all queries in companion with the current thread Scheduler (Schedulers.trampoline()). The connection is wrapped in a ConnectionNonClosing which suppresses close calls so that the connection will still be open for all queries and will remain open after use of the Database object.

Example:

Database db = Database.from(con);

Fetch Size

The fetch size setting in statements allows to specify how many rows should be fetched from the database at once. In other words, instead of fetching all data in the ResultSet at once, potentially consuming a lot of memory in the heap, the fetch size setting allows to trade time, due to multiple round-trips to the database, in exchange for lower memory consumption.

Example:

db
    .select("select * from person")
    // set fetch size
    .fetchSize(10)
    //
    .autoMap(Person.class)
    // 
    .take(20)
	// log
	.doOnEach(RxUtil.log());

In this case, the JDBC driver will do two round trips, each time fetching 10 rows and transforming each row to an instance of Person.

Note for SQLite Users

rxjava-jdbc does support SQLite. But due to the SQLite architecture there are limitations particularly with write operations (CREATE, INSERT, UPDATE, DELETE). If your application has any write operations, use a single connection. If a source Observable pushes emissions through a series of database read/write operations, always collect emissions and flatten them between each database read/write operation. This will prevent a SQLITE_INTERRUPT exception by never having more than one query open at a time.

Observable<MyItem> = Observable.just(itemstoInsert)
		.compose(executeInsertsAndGetKeys())
		.toList().concatMap(Observable::from)
		.compose(selectAndAutoMap());

rxjava-jdbc's People

Contributors

alexeybuzdin avatar bryant1410 avatar davidmoten avatar dependabot[bot] avatar goznauk avatar lukaseder avatar michele-mancioppi avatar sjaakieb avatar sparty02 avatar taher-ghaleb avatar thomasnield avatar usultis avatar vtsyryuk avatar zsiegel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxjava-jdbc's Issues

Integrate with coveralls.io

Integrate with coveralls to report on code coverage of unit tests via continuous integration.

Two initial things are needed to make this happen (@davidmoten would have to tackle these):

  1. Establish the rxjava-jdbc repo at coveralls.io. When you sign in with your github creds, it'll be obvious what to do.
  2. Add an environment variable in travis called "COVERALLS_REPO_TOKEN". The value for this will be provided after you establish the repo in coveralls.

sql in operator

Hey David,

Is there a way to use sql in operator in rxjava-jdbc? I would like to use it in a way that enables me to pass a List (or Set) to the builder which substitutes it for my named parameter in the sql string. If it is not possible, what is the best way to use sql in operator? I tried to google it but no result...

Thanks,
Zoltรกn

RxJava 2.0 Planning?

Hey @davidmoten,

This is probably going to be a somewhat large refactoring task. What are your plans with RxJava 2.0 being on the horizon? Will there be two versions of this codebase on two branches? With two separate 1.0.x and 2.0.x releases?

Tom

C3P0 0.9.5 release not closing connections

Hey,

In your documentation, you say you should use a newer version than the pre-8 release of c3p0. But in the latest (0.9.5, which is after pre8), connections don't seem to be closed when done. My connection pool grows with each query, making it unusable.

I would really like to use pooled connections. Is there any (quick) fix?

java.lang.IllegalStateException: more items arrived than were requested

Hi,

I'm trying to use rxjava-jdbc to query N elements from my db on a regular basis and then query for each of those elements an external service.

That external service is limited to a certain rate so I have tested what is explained in Constraining a stream of events in Rx to a maximum rate to limit the pace of my observable.

My first test was without rxjava-jdbc and works fine:

    Long start = System.currentTimeMillis();
    CountDownLatch latch = new CountDownLatch(1);
    Observable
        .interval(0, 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .doOnNext(t -> System.out.println("start " + t))
        .flatMap(i -> Observable.just(i + "A", i + "B"))
        .doOnNext(t -> System.out.println("result " + t))
        .concatMap(i -> Observable.empty()
            .delay(100, TimeUnit.MILLISECONDS)
            .startWith(i)
        )
        .subscribe(
            i -> {
                System.out.println("item " + i + " received at " + (System.currentTimeMillis() - start) + "ms");
            },
            t -> {
                System.out.println("Something went wrong ");
                t.printStackTrace();
                latch.countDown();
            },
            latch::countDown
        );

    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

This prints out:

start 0
result 0A
item 0A received at 70ms
result 0B
start 1
start 2
start 3
...
start 106
item 0B received at 175ms
start 107
start 108
result 1A
start 109
start 110
...
start 128
item 1A received at 280ms
result 1B
start 215
item 1B received at 385ms
result 2A
item 2A received at 490ms
result 2B
start 421
item 2B received at 591ms
result 3A
item 3A received at 696ms
result 3B
start 631
...

So as you can see everything works fine and BackPressureLast activates.

Now I tried to replace the flatMap by a query to the database which should behave the same :

    Long start = System.currentTimeMillis();
    CountDownLatch latch = new CountDownLatch(1);
    Observable
        .interval(0, 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .doOnNext(t -> System.out.println("start " + t))
        .lift(db
            .select("SELECT v1.v || v2.v FROM (VALUES (?)) AS v1(v), (VALUES ('A'), ('B')) AS v2(v)")
            .parameterOperator()
            .getAs(String.class)
        )
        .doOnNext(t -> System.out.println("result " + t))
        .concatMap(i -> Observable.empty()
            .delay(100, TimeUnit.MILLISECONDS)
            .startWith(i)
        )
        .subscribe(
            i -> {
                System.out.println("item " + i + " received at " + (System.currentTimeMillis() - start) + "ms");
            },
            t -> {
                System.out.println("Something went wrong ");
                t.printStackTrace();
                latch.countDown();
            },
            latch::countDown
        );

    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

Well, here's what I get:

start 0
result 0A
item 0A received at 463ms
result 0B
start 1
start 2
start 3
start 4
Something went wrong
java.lang.IllegalStateException: more items arrived than were requested
    at rx.internal.producers.ProducerArbiter.produced(ProducerArbiter.java:98)
    at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:208)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
    at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
    at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:206)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
    at com.github.davidmoten.rx.subjects.PublishSubjectSingleSubscriber.onNext(PublishSubjectSingleSubscriber.java:58)
    at rx.observers.Subscribers$1.onNext(Subscribers.java:67)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OperatorOnBackpressureLatest$LatestEmitter.emit(OperatorOnBackpressureLatest.java:163)
    at rx.internal.operators.OperatorOnBackpressureLatest$LatestEmitter.onNext(OperatorOnBackpressureLatest.java:129)
    at rx.internal.operators.OperatorOnBackpressureLatest$LatestSubscriber.onNext(OperatorOnBackpressureLatest.java:209)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
    at rx.Scheduler$Worker$1.call(Scheduler.java:120)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.github.davidmoten.rx.jdbc.Parameter.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:104)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:99)
    ... 19 more

After that there's no more output.

Am I doing something wrong here ?
Thanks

Regards,
Crystark

Feat: Stored Procedures

(I saw a comment on the README along the lines of supporting Callable statements. Assuming this is where you were heading with that?)

Maybe something like:

String state = db.
        .call("{call GetStateByZip(?)}")
        .parameter(90210)
        .getAs(String.class)
        .first().toBlocking().single();
System.out.println(state);  // California

Stored procedures would support named parameters, as already supported in other queries.

Query with "... like ''%?%" gets empty parameters

On the 0.6.9-SNAPSHOT as of latest master c09bad3 (w/ #47 fixed) I get empty paramagnets for queries containing like '%?%'

In a query

db
  .select("SELECT * FROM table_name WHERE column_name like '%?%'")
  .parameter(value)
  ...

the debug output looks like this

QuerySelectOnSubscribe:106 - preparing statement,sql=SELECT * FROM table_name WHERE column_name like '%?%'
QuerySelectOnSubscribe:125 - executing sql=SELECT * FROM table_name WHERE column_name like '%?%', parameters []

This query works as fine and actual parameters logged well

SELECT * FROM table_name WHERE column_name like ?

This one does not work again

SELECT * FROM viewer_urls WHERE lower(showcases) like '?'

Could you please point if I'm doing something wrong here?

Problems with SQLite and File-locking

Hi David,
I've been playing a lot with SQLite and RxJava-JDBC this weekend. This may just be a limitation of SQLite and the fact it is serverless and file-based, so obviously it has a limitation with concurrent file-locking .

Suppose I create a SQLite database with these two simple tables, where TABLE_ONE is the parent and TABLE_TWO is the child.

CREATE TABLE TABLE_ONE (
    ID    INTEGER PRIMARY KEY
                  NOT NULL,
    VALUE INTEGER NOT NULL
);

CREATE TABLE TABLE_TWO (
    ID         INTEGER NOT NULL
                       PRIMARY KEY,
    FOREIGN_ID INTEGER NOT NULL
                       REFERENCES TABLE_ONE ([KEY]),
    VALUE      INTEGER NOT NULL
);

I then create an RxJava-JDBC procedure to insert a record into TABLE_ONE, and reactively take the new ID and use it to populate a TABLE_TWO record.

public final class Launcher {

    public static void main(String[] args) {

       final Database db = Database.builder()
                .url("jdbc:sqlite:C:/Users/Thomas/test.db")
                .pool(1, 5)
                .build();

        db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (100)")
                .returnGeneratedKeys()
                .getAs(Integer.class)
                .flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
                        .parameter(k)
                        .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
                ).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
                        .parameter(t1.id)
                        .parameter(t1.value)
                        .returnGeneratedKeys()
                        .getAs(Integer.class)
                ).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
                        .parameter(k)
                        .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))

        ).subscribe(System.out::println, Throwable::printStackTrace);
    }
    private static final class Type1 {
        private final int id;
        private final int value;
        private Type1(int id, int value) {
            this.id = id;
            this.value = value;
        }
    }
    private static final class Type2 {
        private final int id;
        private final int foreignId;
        private final int value;
        private Type2(int id, int foreignKey, int value) {
            this.id = id;
            this.foreignId = foreignKey;
            this.value = value;
        }
        @Override
        public String toString() {
            return "Type2{" +
                    "id=" + id +
                    ", foreignId=" + foreignId +
                    ", value=" + value +
                    '}';
        }
    }
}

The insertion for TABLE_ONE executes successfully. But when the second insertion for TABLE_TWO occurs I get a classic SQLite [SQLITE_BUSY] error.

Caused by: java.sql.SQLException: [SQLITE_BUSY]  The database file is locked (database is locked)
    at org.sqlite.core.DB.newSQLException(DB.java:890)
    at org.sqlite.core.DB.newSQLException(DB.java:901)
    at org.sqlite.core.DB.execute(DB.java:807)
    at org.sqlite.core.DB.executeUpdate(DB.java:847)
    at org.sqlite.jdbc3.JDBC3PreparedStatement.executeUpdate(JDBC3PreparedStatement.java:86)
    at com.zaxxer.hikari.proxy.PreparedStatementProxy.executeUpdate(PreparedStatementProxy.java:61)
    at com.zaxxer.hikari.proxy.HikariPreparedStatementProxy.executeUpdate(HikariPreparedStatementProxy.java)
    at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.performUpdate(QueryUpdateOnSubscribe.java:225)
    ... 309 more

I had a theory that the monad was to blame and the onCompleted() was never called after the first INSERT, causing the connection/file to still be occupied and locked. So on the second INSERT, the statement was not yet closed from the first INSERT, causing the [SQLITE_BUSY] error.

I think my theory proved correct. When I blocked the first INSERT and pulled the resulting first object out of the monad, I was able to successfully use it to drive the second INSERT.

       final Database db = Database.builder()
                .url("jdbc:sqlite:C:/Users/Thomas/test.db")
                .pool(1, 5)
                .build();

        Type1 type1 = db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (100)")
                .returnGeneratedKeys()
                .getAs(Integer.class)
                .flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
                        .parameter(k)
                        .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
                ).toBlocking().first();

                db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
                        .parameter(type1.id)
                        .parameter(type1.value)
                        .returnGeneratedKeys()
                        .getAs(Integer.class)
                        .flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
                                .parameter(k)
                                .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
                        ).subscribe(System.out::println, Throwable::printStackTrace);

Obviously this is not an optimal workaround because we don't want to break the monad. Considering SQLite is such a ubiquitous database solution especially with Android developers, I propose we remedy this if possible. I can try to help if we want to investigate this.

What do you think?

Is Async by default a good idea?

Rather than requesting a synchronous version of a Database using Database.synchronous perhaps we should be synchronous by default with the option of async via Database.asynchronous? Would match the Rx Design Guidelines a bit better.

About the Database.asynchronous() method

Hi gays๏ผš
I user rxNetty + rxjava-jdbc for a project,when I use the asynchronous api for ab test
like ab -k -n 100000 -c 1000 'http://localhost:8080/aa?user_id=4'.
It will create so many threads at last throw OutOfMemoryError

Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

*.I din't know why,it's something in my code ?
On other hand
Is that create some new api for set an new Scheduler by user ,make user can control what kind Scheduler to use
some thing

 Database.asynchronous(Func0<Scheduler> asynScheduler) 

here is problem code

public Observable<User> findUser(int id) {
    return db.asynchronous().select("select * from users where id = ?").parameter(id).autoMap(User.class);
  }

In rxNetty server

RxNetty.newHttpServerBuilder(8080, new RequestHandler<ByteBuf, ByteBuf>() {
      @Override
      public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
        request.getQueryParameters().get("user_id").get(0);
       int user_id= Ints.tryParse(Iterables.getFirst(request.getQueryParameters().get("user_id"),"1"));
        return us.findUser(user_id)
          .map(new Func1<User, String>() {
            @Override
            public String call(User user) {
              return JSON.toJSONString(user);
            }
          })
          .flatMap(new Func1<String, Observable<Void>>() {
            @Override
            public Observable<Void> call(String s) {
              response.getHeaders().add(HttpHeaders.Names.CONTENT_LENGTH, s.getBytes().length);
              response.writeStringAndFlush(s);
//              System.out.printf("in flat map thread [%s] \n",Thread.currentThread().getName());
              return response.close();
            }
          });
      }
    }).eventLoop(new EpollEventLoopGroup()).channel(EpollServerSocketChannel.class).build().startAndWait();

Lift with parameterListOperator swallows errors ?

Hi,

Consider the following code:

obs
    .map(t -> getParam3(t)
        .flatMap(p3 -> Observable.<Object> just(t.param1, t.param2, p3))
    )
    .lift(db
        .update(query)
        .parameterListOperator()
    )
    .onErrorResumeNext((t) -> {
        L.warn("Something went wrong.", t);
        return Observable.empty();
    });

If getParam3 returns an observable containing an error, I'll never see it unless i put some error handling directly on that observable. I suppose parameterListOperator swallows the error.

I hope you can fix this.
Thanks !

batch updates?

Hi!

Love the look of this, but is there a way to do batch updates?

Exception handling

I think something's wrong with the exception handling. This code does not generate any errors, it returns an empty Observable. doOnError is never called.

    db.select("select name, score from person order by name")
            .getAs(Person.class)  // mistakenly replaced autoMap by getAs
            .doOnError(Throwable::printStackTrace)
            .forEach(System.out::println);

In reality a ClassCastException occurs (String.class cannot be cast to Person.class), but this programming is completely hidden to the developer. This makes using and debugging this library very hard.

Please consider support object mapping friendly CURD Methods

Since there is already support for this:
https://github.com/davidmoten/rxjava-jdbc#auto-mappings

int keyId = 
    db.update("insert into Person (pid, age, dob, sex, name) values (null, ?, ?, ?, ?) ")
      .parameters(person.getAge(), person.getDob(), person.isMale(), person.getName())
      .returnGeneratedKeys()
      .getAs(Integer.class)
      .toBlocking().single();

Please consider support for a concrete class:

int keyId = 
    db.insert(person)  // << Object AutoMapped Person for insert.
      .returnGeneratedKeys()
      .getAs(Integer.class)
      .toBlocking().single();

Such support will reduce the amount of manual refactoring done for remodeling of Person object which rename or adding fields.

Subscribe seems to be blocking

Hello,
I'm facing an issue when subscribing to returned Observable from rxjava-jdbc. When I'm subscribing to the Observable stream it seems that subscribe() method is blocking.
See the following code:

Observable<String> score = db.select("select name from users").getAs(String.class);
score.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("onCompleted"));
System.out.println("after subscribe");

I'm just doing a basic select query and after the subscribe I'm printing some message. But for some reason the message appears only after the query executes and "onCompleted" is printed.

I don't call .toBlocking() as in all your examples. However, I see that the subscribe() call is blocking.
I've checked this functionality with both mySQL driver ("com.mysql.jdbc.Driver") and Hive driver ( "org.apache.hive.jdbc.HiveDriver" ) and I see the same behavior - the subscribe() call is blocking.

Am I doing something wrong?
Thanks!

Question: Interop with JTA?

Does rxjava-jdbc's transaction support interop with JTA? In other words, will it transparently participate in existing JTA initiated transactions? Still trying to wrap my head around the 'push' model of observables and not sure if this is possible when inverting the control over to observable streams.

In the use case I'm thinking of, I wouldn't (can't) explicitly commit/rollback/etc in JDBC, but rather let the higher level transaction manager do it on my behalf (whether that be through a UserTransaction API or via declarative transaction demarcation).

Hopefully that makes sense!

Feature: Load data in slices(pagination under hood)

Considering the fact this library has to be used in Rx architecture, there comes the need to load and process data in slices (pagination).

Lets say you have to process a million of records so you cannot handle all this stuff just using one 'select' operation. You have to load data in bits and process just few rows at time and then load next few rows and process ...

Connections not released after each requests ?

Hi,

I'm having some problem on this observable. I'm using rxjava-jdbc 0.6.8 with a min size 1 and a max size 2 for the Hikari pool.

updateObservable = Observable.interval(2, 2, TimeUnit.SECONDS)
    .onBackpressureLatest()
    .flatMap(i -> db
        .select("SELECT * FROM elements")
        .autoMap(Element.class)
    )
    .compose(service.asTransformer())
    .doOnNext(t -> System.out.println("updated " + t))
    .map(e -> Observable.<Object> just(e.part1, e.part2, new Timestamp(e.updated_at), e.id))
    .lift(db
        .update("UPDATE elements SET part1 = ?, part2 = ?, updated_at = ? WHERE id = ?")
        .parameterListOperator()
    )
    .compose(Observable::merge)
    .doOnNext(t -> System.out.println("done " + t))
    .retry((i, t) -> {
        if (i < RETRIES) {
            L.warn("Something went wrong. Retry (" + i + "/" + RETRIES + ")...", t);
            return true;
        }
        else {
            L.error("Something is still wrong after " + RETRIES + " retries. Fix it. " + NAME + " stopped.", t);
            return false;
        }
    })
    .onErrorResumeNext(t -> Observable.empty());

Here are the logs this shows

updated Element [id=169900]
done 1
updated Element [id=169901]
2015-12-11 11:38:59 WARN  ElementUpdater:66 - Something went wrong. Retry (1/100)...
com.github.davidmoten.rx.jdbc.exceptions.SQLRuntimeException: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms.
        at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:92)
        at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.getConnection(QueryUpdateOnSubscribe.java:126)
        at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:79)
        at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:21)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
        at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:77)
        at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:56)
        at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:57)
        at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:43)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorBufferWithSize$1.onNext(OperatorBufferWithSize.java:104)
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
        at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.setProducer(OperatorConcat.java:222)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
        at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onNext(OperatorOnErrorResumeNextViaFunction.java:111)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
        at io.reactivex.netty.protocol.http.UnicastContentSubject$AutoReleaseByteBufOperator$1.onNext(UnicastContentSubject.java:262)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:145)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:178)
        at io.reactivex.netty.protocol.http.UnicastContentSubject.onNext(UnicastContentSubject.java:286)
        at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.invokeContentOnNext(ClientRequestResponseConverter.java:248)
        at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.channelRead(ClientRequestResponseConverter.java:141)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms.
        at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:196)
        at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148)
        at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:99)
        at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:90)
        ... 123 more
updated Element [id=169901]
done 1
updated Element [id=169902]
done 1
updated Element [id=169903]
2015-12-11 11:39:35 WARN  ElementUpdater:66 - Something went wrong. Retry (2/100)...
com.github.davidmoten.rx.jdbc.exceptions.SQLRuntimeException: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms.
        [...] Same stack trace
updated Element [id=169903]
done 1
...

So each time it does 2 requests, it freezes and times out at 30 seconds (I guess cause all the connections are used). It then retries , succeeds at the id it failed on, succeeds at the next and freezes once again when already 2 requests have been done. And so on.

I must not be understanding something about the pools here. Please let me know if i'm doing something wrong.

Issues with Teradata?

I'm not sure whether to post this problem here or in the HikariCP page. I am trying to use Teradata which works just fine as shown below. (EDIT: Not fine, after iterating records it acts like onCompleted() is never called)

Database.builder()
.url("jdbc:teradata://CSEDW/database=edw_prod_vw,logmech=TD2")
.username(myUserName)
.password(myPassword)

But the moment I try to pool() the connections I start to have problems

Database.builder()
.url("jdbc:teradata://CSEDW/database=my_db_env,logmech=TD2")
.username(myUserName)
.password(myPassword)
.pool(2,5);

I get the following stacktrace

com.zaxxer.hikari.pool.PoolInitializationException: Exception during pool initialization: Connection.isValid() is not supported, configure connection test query.
    at com.zaxxer.hikari.pool.HikariPool.initializeConnections(HikariPool.java:581)
    at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:152)
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:96)
    at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:80)
    at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.connectAndPrepareStatement(QuerySelectOnSubscribe.java:105)
    at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:64)
    at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:19)
    at rx.Observable.unsafeSubscribe(Observable.java:7531)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:80)
    at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:59)
    at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:57)
    at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:43)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:43)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:32)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable.unsafeSubscribe(Observable.java:7531)
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:176)
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:140)
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:159)
    at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
    at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:206)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:43)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:32)
    at rx.Observable.unsafeSubscribe(Observable.java:7531)
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:176)
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:140)
    at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:106)
    at rx.Subscriber.setProducer(Subscriber.java:177)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:50)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable.subscribe(Observable.java:7621)
    at rx.Observable.subscribe(Observable.java:7397)
    at com.swa.rm.od.AirportConversionMapping.main(AirportConversionMapping.java:32)
Caused by: java.sql.SQLException: Connection.isValid() is not supported, configure connection test query.
    at com.zaxxer.hikari.pool.PoolElf.setupConnection(PoolElf.java:178)
    at com.zaxxer.hikari.pool.HikariPool.addConnection(HikariPool.java:497)
    at com.zaxxer.hikari.pool.HikariPool.initializeConnections(HikariPool.java:565)
    ... 46 more

How do I configure Teradata into this?

(auto)closeable Database?

Hi,

Can you make Database to implement AutoCloseable interface?

So, we can use inside with try-with-resources expression.

Thanks

Checking database state

Hi, I am experimenting with this library and javarx, so far I didn't find a method to check whether Database objects are connected or not (for example: no internet connection, db server is down, etc). I guess it can handled with this observable pattern somehow.

Also, besides the README, is there any other document or guide I could use?

To how get Last inserted Id from MySQL?

How do I get last insert id from mysql?

database.update("insert into person(name,sex,age,dob) values(?,?,?,?)")
.parameters(person.getName(), person.isMale(), person.getAge(), person.getDob());

I prefer to use the NonBlocking Reactive Pattern.

In JDBC I could so something like this:

Statement stmt = db.prepareStatement(query, Statement.RETURN_GENERATED_KEYS);
stmt.executeUpdate();

ResultSet rs = stmt.getGeneratedKeys();
if (rs.next()){
    result=rs.getInt(1);
}

I have also posted this issue in Stackoverflow. It is a common features used in JDBC programming.
http://stackoverflow.com/questions/30610393/rxjava-jdbc-0-5-7-how-to-obtain-last-insert-id

Handle setting of nulls for clobs and blobs

Currently rs.setObject(null) is used but with h2 test database and probably others this is not permitted with clob and blob. Will need to use rs.setNull() method instead but then need to know type of parameter...

NPE issue with batching?

Hey David,

My colleague is doing some work with the batching and he seems to have an issue that throws an NPE. He is using MySQL, and if he disables batching it works fine. See details below:

Does not work (produces the error java.lang.NullPointerException at com.github.davidmoten.rx.jdbc.ConnectionProviderBatch.close(ConnectionProviderBatch.java:24) atโ€ฆ) :

db.update(insertSql)
        .batchSize(1000)
        .parameters(params)
        .count()

Works (inserts records 1 at a time):

db.update(ResourceFile.toString(insertSql)
        //.batchSize(1000)
        .parameters(params)
        .count()

Full stack trace

java.lang.NullPointerException
at com.github.davidmoten.rx.jdbc.ConnectionProviderBatch.close(ConnectionProviderBatch.java:24)
at com.github.davidmoten.rx.jdbc.Database.endTransactionObserve(Database.java:687)
at com.github.davidmoten.rx.jdbc.QueryContext.endTransactionObserve(QueryContext.java:57)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:91)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at com.github.davidmoten.rx.internal.operators.OnSubscribeMapLast$MapLastSubscriber.onNext(OnSubscribeMapLast.java:97)
at rx.internal.operators.OperatorBufferWithSize$BufferExact.onNext(OperatorBufferWithSize.java:114)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:117)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at com.github.davidmoten.rx.internal.operators.OperatorFromTransformer$ParentSubscriber.onNext(OperatorFromTransformer.java:93)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:399)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:357)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:852)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:117)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100)
at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73)
at rx.internal.operators.DeferredScalarSubscriberSafe.onCompleted(DeferredScalarSubscriberSafe.java:54)
at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
at rx.internal.operators.OnSubscribeRedo$4$1.onCompleted(OnSubscribeRedo.java:321)
at rx.internal.operators.OperatorDematerialize$1.onCompleted(OperatorDematerialize.java:85)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:298)
at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:284)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:152)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:255)
at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:162)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at rx.internal.operators.OnSubscribeRedo$2$1.onCompleted(OnSubscribeRedo.java:228)
at com.swa.rm.common.util.OperatorEmissionCounter$call$parent$1.onCompleted(RxOperators.kt:210)
at com.swa.rm.common.util.OperatorEmissionCounter$call$parent$1.onCompleted(RxOperators.kt:210)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70)
at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:177)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:64)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:246)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onCompleted(OnSubscribeConcatMap.java:169)
at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:177)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:64)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:246)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onCompleted(OnSubscribeConcatMap.java:169)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:177)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:64)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:246)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onCompleted(OnSubscribeConcatMap.java:169)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:106)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeRedo$2.call(OnSubscribeRedo.java:273)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OnSubscribeRedo$5.request(OnSubscribeRedo.java:361)
at rx.internal.operators.DeferredScalarSubscriber.setProducer(DeferredScalarSubscriber.java:143)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:353)
at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:47)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.DeferredScalarSubscriber.subscribeTo(DeferredScalarSubscriber.java:153)
at rx.internal.operators.OnSubscribeCollect.call(OnSubscribeCollect.java:50)
at rx.internal.operators.OnSubscribeCollect.call(OnSubscribeCollect.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.DeferredScalarSubscriber.subscribeTo(DeferredScalarSubscriber.java:153)
at rx.internal.operators.OnSubscribeReduceSeed.call(OnSubscribeReduceSeed.java:40)
at rx.internal.operators.OnSubscribeReduceSeed.call(OnSubscribeReduceSeed.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100)
at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73)
at rx.internal.operators.DeferredScalarSubscriberSafe.onCompleted(DeferredScalarSubscriberSafe.java:54)
at rx.observers.Subscribers$5.onCompleted(Subscribers.java:225)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:177)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:64)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:246)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onCompleted(OnSubscribeConcatMap.java:169)
at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:177)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:64)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:246)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onCompleted(OnSubscribeConcatMap.java:169)
at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:177)
at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:64)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:246)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onCompleted(OnSubscribeConcatMap.java:169)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:106)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.DeferredScalarSubscriber.subscribeTo(DeferredScalarSubscriber.java:153)
at rx.internal.operators.OnSubscribeCollect.call(OnSubscribeCollect.java:50)
at rx.internal.operators.OnSubscribeCollect.call(OnSubscribeCollect.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.util.ScalarSynchronousObservable$3.call(ScalarSynchronousObservable.java:235)
at rx.internal.util.ScalarSynchronousObservable$3.call(ScalarSynchronousObservable.java:228)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:399)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:357)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:852)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
at rx.internal.operators.OperatorDistinct$1.onNext(OperatorDistinct.java:63)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:399)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:357)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:852)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:115)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:76)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.request(QuerySelectProducer.java:48)
at rx.internal.operators.OperatorSubscribeOn$1$1$1.request(OperatorSubscribeOn.java:80)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OperatorSubscribeOn$1$1.setProducer(OperatorSubscribeOn.java:76)
at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:69)
at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:20)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerScalarProducer.request(OnSubscribeConcatMap.java:366)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:278)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.fastPath(OnSubscribeFromArray.java:76)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:58)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:399)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:357)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:852)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.performUpdate(QueryUpdateOnSubscribe.java:259)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:88)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OperatorBufferWithSize$BufferExact.onNext(OperatorBufferWithSize.java:114)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:117)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.performUpdate(QueryUpdateOnSubscribe.java:259)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:88)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OperatorBufferWithSize$BufferExact.onNext(OperatorBufferWithSize.java:114)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:117)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.DeferredScalarSubscriber.subscribeTo(DeferredScalarSubscriber.java:153)
at rx.internal.operators.OnSubscribeReduceSeed.call(OnSubscribeReduceSeed.java:40)
at rx.internal.operators.OnSubscribeReduceSeed.call(OnSubscribeReduceSeed.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe$2.onNext(QueryUpdateOnSubscribe.java:279)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:115)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestAll(QuerySelectProducer.java:56)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.request(QuerySelectProducer.java:46)
at rx.internal.operators.OperatorSubscribeOn$1$1$1.request(OperatorSubscribeOn.java:80)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OperatorSubscribeOn$1$1.setProducer(OperatorSubscribeOn.java:76)
at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:69)
at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:20)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OperatorBufferWithSize$BufferExact.onNext(OperatorBufferWithSize.java:114)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:117)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerScalarProducer.request(OnSubscribeConcatMap.java:366)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:278)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.performUpdate(QueryUpdateOnSubscribe.java:247)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:88)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OperatorBufferWithSize$BufferExact.onNext(OperatorBufferWithSize.java:114)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:117)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.util.ScalarSynchronousObservable$3.call(ScalarSynchronousObservable.java:235)
at rx.internal.util.ScalarSynchronousObservable$3.call(ScalarSynchronousObservable.java:228)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:399)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:357)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:852)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.setProducer(OnSubscribeFilter.java:104)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:399)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:357)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:852)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.onNext(OnSubscribeConcatMap.java:335)
at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:115)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:76)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.request(QuerySelectProducer.java:48)
at rx.internal.operators.OperatorSubscribeOn$1$1$1.request(OperatorSubscribeOn.java:80)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerSubscriber.setProducer(OnSubscribeConcatMap.java:329)
at rx.internal.operators.OperatorSubscribeOn$1$1.setProducer(OperatorSubscribeOn.java:76)
at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:69)
at com.github.davidmoten.rx.jdbc.QuerySelectOnSubscribe.call(QuerySelectOnSubscribe.java:20)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerScalarProducer.request(OnSubscribeConcatMap.java:366)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:278)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10211)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:250)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:147)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
at rx.Scheduler$Worker$1.call(Scheduler.java:137)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) 

db.select(sql).dependsOnOperator().getXXX() doesn't seem to work

Hi,

I have the following piece of code that doesn't seem to work as I want:

updateObservable = Observable.interval(0, 10, TimeUnit.SECONDS)
    .doOnNext(s -> System.out.println("try " + s))
    .lift(db
        .select("SELECT id FROM mytable ORDER BY updated_at ASC")
        .dependsOnOperator()
        .getTupleN()
    )
    .doOnNext(s -> System.out.println("result " + s));

updateObservable.subscribe();

This prints out:

try 0
try 1
...

Although this seems to work fine:

updateObservable = Observable.interval(0, 10, TimeUnit.SECONDS)
    .doOnNext(s -> System.out.println("try " + s))
    .lift(db
        .select("SELECT id FROM mytable WHERE id <> ? ORDER BY updated_at ASC")
        .parameterOperator()
        .getTupleN()
    )
    .doOnNext(s -> System.out.println("result " + s));
updateObservable.subscribe();

Because it prints the following:

try 0
result TupleN [values=[18]]
result TupleN [values=[16]]
result TupleN [values=[17]]
try 1
result TupleN [values=[18]]
result TupleN [values=[16]]
result TupleN [values=[17]]
...

Am I doing something wrong in the first example ?

FYI I've simplified the above example but what I really want to do is to continuously poll that request and process its elements. It would probably look like that:

updateObservable = Observable.interval(0, 1, TimeUnit.NANOSECONDS)
    .onBackpressureLatest()
    .lift(db
        .select("SELECT id FROM mytable ORDER BY updated_at ASC")
        .dependsOnOperator()
        .getTupleN()
    )
    .<List<String>> compose(new MyTableTransformer())
    .lift(db
        .update("UPDATE mytable SET updated_at = ? WHERE id = ?")
        .parameterOperator()
    );

updateObservable.subscribe();

Thanks

How to specify a schema to be used

Hi.

Is there a way to specify what schema to be used when executing statements? As of 9.4 you can specify currentSchema as a connection parameter but unfortunately I can't use that for my use case.

I'm sorry if this issue is in the wrong place, but I was unsure where to put it.

Take operator combined with single result prevents the connection from being closed

I noticed a connection leak in our code using rxjava-jdbc. This is how to reproduce it:

    Observable<Long> count =
            database.select("select count(*) from x")
                    .getAs(Long.class)

    CountDownLatch latch = new CountDownLatch(1);
    count.take(1).subscribe(element -> {
        latch.countDown();
    });

The connection used by the query is not closed.
If you change it to take(2), the connection is closed.
I narrowed it down to the fact that OperatorTake overrides the setProducer method limiting the number of requested items.

Using rxjava 1.0.8 and rxjava-jdbc 0.5.1.

Batch operations do not work properly when using a connection pool

I faced this when I tried to implement batch inserts in my application and I was using a connection pool (without the pool setting it works perfectly fine).

It is fairly easy to reproduce using this test:

    @Test
    public void testWithPool() throws InterruptedException {
        Database db = Database.builder()
                .url("jdbc:h2:mem:test1;DB_CLOSE_DELAY=-1")
                .pool(1,32)
                .build()
                .asynchronous();

        db.update("CREATE TABLE TRADE(\n" +
                "        STATUS VARCHAR(64)  ,\n" +
                "PRICE NUMERIC(20,5)  ,\n" +
                "CLIENT_ID VARCHAR(64)  NOT NULL,\n" +
                "QUANTITY INTEGER  ,\n" +
                "TIMESTAMP BIGINT  ,\n" +
                "RECORD_ID VARBINARY(8)  ,\n" +
                "ID VARCHAR(64)  NOT NULL,\n" +
                "        PRIMARY KEY(ID),\n" +
                "CONSTRAINT TRADE_BY_CLIENT_ID UNIQUE (CLIENT_ID, ID) ,\n" +
                "        CONSTRAINT chk_TRADE_STATUS CHECK (STATUS IN ('PENDING', 'COMPLETE')))").count().toBlocking().single();

        int numPeopleBefore = db.select("select count(*) from TRADE") //
                .getAs(Integer.class) //
                .toBlocking().single();
        final List<Observable<Map<String, Object>>> params = new ArrayList<>();
        for(int i = 0; i < 5; i++){
            final Map<String, Object> paramMap = new HashMap<>();
            paramMap.put("ID", "Trade" + i);
            paramMap.put("QUANTITY", 5000 + 1);
            paramMap.put("PRICE", new BigDecimal(32.44 + i));
            paramMap.put("STATUS", "PENDING");
            paramMap.put("CLIENT_ID", "Client1");
            paramMap.put("TIMESTAMP", System.currentTimeMillis());
            paramMap.put("RECORD_ID", ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array());
            params.add(Observable.just(paramMap));
        }

        Observable<Integer> count = db.update("insert into TRADE(ID,QUANTITY,PRICE,STATUS,CLIENT_ID,TIMESTAMP,RECORD_ID) values(:ID,:QUANTITY,:PRICE,:STATUS,:CLIENT_ID,:TIMESTAMP,:RECORD_ID)")
                .dependsOn(db.beginTransaction())
                // set batch size
                .batchSize(3)
                // get parameters from last query
                .parameters(Observable.merge(params))
                // go
                .count()
                // end transaction
                .count();
        assertTrue(db.commit(count).toBlocking().single());
        int numPeople = db.select("select count(*) from TRADE") //
                .getAs(Integer.class) //
                .toBlocking().single();
        assertEquals(numPeopleBefore + 5, numPeople);
    }

This is the output using Hikari 2.6.0 and H2 1.4.192.
H2FailedTest.txt

If I use Hikari 2.3.13 (like rxjava-jdbc project does) it still fails. Seems like the pool is shut down after the batch insert due to a failure at trying to close the connection.

This is the output if I replace H2 for HSQLDB 2.3.2 by replacing the url for this "jdbc:hsqldb:mem:aname;user=user;" :

HSQLDBFailedTest.txt

Is there something I am not taking into account?

Example to handle failures in Transaction: Getting connection-timeout for subsequent requests

Transation 'firstTransaction' has two insert queries Q1 and Q2. In the following example query Q2 fails due to primary key constraint violation.

      @Test
    public void test() {
        Observable<Integer> firstTransactionFirstPart = database.update(queryThatInsertsinTableWithPrimaryKeyA)
                .dependsOn(database.beginTransaction())
                .parameters(new Object[0])
                .count();
        Observable<Integer> firstTransactionSecondPart = database.update(queryThatInsertsinTableWithPrimaryKeyA)
                .dependsOn(firstTransactionFirstPart)
                .parameters(new Object[0])
                .count();
        Observable<Boolean> firstTransaction = database.commit(firstTransactionSecondPart);
        firstTransaction.subscribe(...);//logging

If we attempt to run some another subsequent transaction, we get an exception

    com.github.davidmoten.rx.jdbc.exceptions.SQLRuntimeException: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:92)
at com.github.davidmoten.rx.jdbc.ConnectionProviderSingletonManualCommit.get(ConnectionProviderSingletonManualCommit.java:43)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.getConnection(QueryUpdateOnSubscribe.java:127)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:80)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
at rx.Observable.unsafeSubscribe(Observable.java:8460)

Max pool size is 1. (it is reproducible on even for other values). Full method impl
Versions:
rxjava-jdbc: 0.7.2 , 0.7.3
Hikari: 2.5.1
postgresql: 9.4.1212
Most likely m doing wrong, but m not able to figure out :(

Help: Example of returnGeneratedKeys within a transaction

Hello,

I'm trying to chain two queries, an insert followed by a select using the generated id of the first query as a parameter of the second query.
Can you provide an example to achieve that ? Thank you.

Here is my code so far :

Observable<Boolean> begin = db.beginTransaction();
Observable<Long> query = db.update("insert into users...").dependsOn(begin).parameters(...).returnGeneratedKeys().getAs(Long.class).first();
Observable<Boolean> commit = db.commit(query);
db.select("select from users where id = ?").dependsOn(commit).getAs(String.class);

Add support for backpressure

Once rxjava 0.20.0 release available and if solid add support for backpressure to the custom operators in this project.

Handling Arrays

I've run into a few issues when attempting to handle arrays. The first issue is that I am using jooq to generate the sql for the request, it provides a request like so:

INSERT into my_table (key1, array_key) VALUES (?, ?::varchar[])

The casting gets detected by NamedParameters has being a parameter with a name. This one is fairly simple to resolve by adding an explicit check for a second ':' like so:

                if (c == '\'') {
                    inSingleQuote = true;
                } else if (c == '"') {
                    inDoubleQuote = true;
                } else if (c == ':' && i + 1 < length) {
                    final char iPlusOne = namedSql.charAt(i + 1);
                    if (Character.isJavaIdentifierStart(iPlusOne)) {
                        int j = i + 2;
                        while (j < length && Character.isJavaIdentifierPart(namedSql.charAt(j))) {
                            j++;
                        }
                        String name = namedSql.substring(i + 1, j);
                        c = '?'; // replace the parameter with a question mark
                        i += name.length(); // skip past the end if the parameter
                        names.add(name);
                    } else if (iPlusOne == ':') {
                        i++;
                    }
                }

The more difficult issue with handling arrays is that the setParameters currently doesn't handle it. My thoughts on this were that a plugin system, similar to the one available in RxJava, could be provided to give users access to the Prepared Statement and object for unknown types.

This way, for types that can't be converted by setObject, like arrays or Java 8 time types, they could be handled. Also, I am not positive on the cross database implications of handling array types.

What are your thoughts? I can provide a PR if you agree with the direction.

Benchmarks

Benchmarks would be nice to help make performance improvements.

I propose to use JMH with some benchmarks that allow us to compare using straight jdbc patterns (lots of try-catch-finally } to using rxjava-jdbc. I would probably use embedded H2 database as per unit tests but open to suggestions.

Consider a "hibernate like" criteria maker:

Hi,

Since there is no plan to turn RxJDBC into a ORM, but perhaps ONLY this portion of the
Library will be useful for the user of your library to avoid the JDBC-(Postgress/MySQL/Oracle/MsSQL...)SQL conversion error.

Consider a hack into this Hibernate Code and how to merge into your library:
How to get SQL from Hibernate Criteria API (not for logging)
http://stackoverflow.com/questions/554481/how-to-get-sql-from-hibernate-criteria-api-not-for-logging

CriteriaImpl criteriaImpl = (CriteriaImpl)criteria;
SessionImplementor session = criteriaImpl.getSession();
SessionFactoryImplementor factory = session.getFactory();
CriteriaQueryTranslator translator=new CriteriaQueryTranslator(factory,criteriaImpl,criteriaImpl.getEntityOrClassName(),CriteriaQueryTranslator.ROOT_SQL_ALIAS);
String[] implementors = factory.getImplementors( criteriaImpl.getEntityOrClassName() );

CriteriaJoinWalker walker = new CriteriaJoinWalker((OuterJoinLoadable)factory.getEntityPersister(implementors[0]), 
                        translator,
                        factory, 
                        criteriaImpl, 
                        criteriaImpl.getEntityOrClassName(), 
                        session.getLoadQueryInfluencers()   );

String sql=walker.getSQLString();

Regex Support for Column Name "get()" arguments?

I finally got around to playing with rxjava-jdbc and I love it! But I have one idea that may get shot down, but I thought I'd give it a try.

My current database wrapper I developed internally has getter methods on my TabularResultSet, which I've built around the ResultSet.

These getter methods are pretty identical to the ones in this library when calling get(). You can call getInt(), getString(),getBool() and pass in a String for the column name to get the respective value for. However, my library also allows you to pass a column name expressed as a regex so more than one naming convention can be accepted for a column. While a little loose, this allows multiple data sources to be swapped in easily that have different column conventions.

So maybe I'm being a little self-focused, but it would be awesome (at least for me) to have multiple column names to be supported in getters via an optional regex feature.

regex_getter

What do you guys think? Would anybody else benefit from this or am I a niche?

And thanks again for creating an awesome library, I really look forward to using this in my projects.

Named parameters

For simple queries jdbc parameters using ? are fine but more complex queries would benefit from named parameters (:name for instance).

This is quite doable, just needs good integration with the API.

Was thinking

db.named() gives builders that always use named parameters in sql and runtime checks on the sql format would occur.

Will you consider taking the Dao/Model pattern of JDBI and SpringData?

Will you please also consider taking these code pattern from JDBI or SpringData?
http://jdbi.org/

public interface MyDAO
{
  @SqlUpdate("create table something (id int primary key, name varchar(100))")
  void createSomethingTable();

  @SqlUpdate("insert into something (id, name) values (:id, :name)")
  void insert(@Bind("id") int id, @Bind("name") String name);

  @SqlQuery("select name from something where id = :id")
  String findNameById(@Bind("id") int id);

  /**
   * close with no args is used to close the connection
   */
  void close();
}```

OR Model+Repository pattern from springdata:
https://github.com/spring-projects/spring-data-book/tree/master/mongodb

```java
package com.oreilly.springdata.mongodb.core;
import org.springframework.data.repository.Repository;

/**
 * Repository interface to access {@link Customer}s.
 * 
 * @author Oliver Gierke
 */
public interface CustomerRepository extends Repository<Customer, Long> {

    /**
     * Returns the customer with the given identifier.
     * 
     * @param id
     * @return
     */
    Customer findOne(Long id);

    /**
     * Saves the given {@link Customer}. #
     * 
     * @param customer
     * @return
     */
    Customer save(Customer customer);

    /**
     * Returns the {@link Customer} with the given {@link EmailAddress}.
     * 
     * @param string
     * @return
     */
    Customer findByEmailAddress(EmailAddress emailAddress);
}

However have OPTIONS to handle the commonly used pattern in Reactive or Sync model??

Implement Iterator<ResultSet>

Hey David!

My team and I have become so happy with this library and its features, we occasionally want to use it in a non-reactive capacity. It would be helpful if the QuerySelect.Builder had a method to return an Iterator<ResultSet> and not just an Observable<ResultSet>. I might have researched and developed a way you can turn an Observable<ResultSet> into a Java 8 Stream<ResultSet> or Kotlin Sequence<ResultSet> easily without major disruption to this library's underlying architecture.

Of course, you can call toBlocking() and get an Iterator but it does some buffering which is unsafe for a stateful ResultSet. Fortunately I think I did the hard work and I'll test it some more. The first thing needed is a specialized synchronizer which I researched on SO and adapted for this purpose.

public final class SingleBlockingQueue<T> {
    private volatile T value;
    private final Semaphore full = new Semaphore(0);
    private final Semaphore empty = new Semaphore(1);
    private volatile boolean isDone = false;

    private volatile boolean hasValue = false;

    public void offer(T value) throws InterruptedException {
        empty.acquire();
        this.value = value;
        hasValue = true;
        full.release();
    }

    public T take() throws InterruptedException {
        full.acquire();
        T returnValue = value;
        value = null; // Should release reference
        hasValue = false;
        empty.release();
        return returnValue;
    }
    public void setDone() {
        isDone = true;
    }
    public boolean isDone() {
        return isDone;
    }
    public boolean hasValue() {
        return hasValue || !isDone;
    }
}

And then you need an ObservableIterator that uses this synchronizer to safely relay the ResultSet from the Observable to the Iterator. Sorry I wrote it in Kotlin, I can rewrite it in Java if needed.

class ObservableIterator<T>(
        observable: Observable<T>
) : Iterator<T>, Closeable {

    private val queue = SingleBlockingQueue<T>()

    private val subscription =
            observable
                    .subscribeWith {
                        onNext { queue.offer(it) }
                        onCompleted { queue.setDone() }
                        onError { queue.setDone() }
                    }

    override fun hasNext(): Boolean {
        return queue.hasValue()
    }

    override fun next(): T {
        return queue.take()
    }
    override fun close() {
        subscription.unsubscribe()
        queue.setDone()
    }
}

This will allow the use of RxJava-JDBC in a functional, serialized, but non-reactive way without having to collect and cache data. What do you think?

README: TupleN<String> tupleN = database.select(query).create()....

Hello,

I am trying to use your API but in the readme it is declared to use the TupleN as:

TupleN tupleN = database.select(query).create().getTupleN(String.class).toBlocking().single();

but create is not found and the above statement does not work (only empty strings in the tupleN object), any workaround?

Create doOnNext()-like Operator that provides access to SQL?

Hi David,

Maybe there's a feature in the logging framework I'm not using, but it would be helpful if somehow I could get the parameterized SQL String emitted on a doOnNext() fashion every time a query is executed.

myDb.select("SELECT * FROM MY_TABLE WHERE EFF_TO = ?")
    .parameters(effToDates)
    .sqlOnNext(sql -> System.out.println(sql))
    .get(rs -> new MyItem(rs));

Would this be hard to add to the QuerySelect.Builder class? I've been poking around trying to find a place to put it for a PR, but to no avail yet.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.