Giter Site home page Giter Site logo

rxjava-extras's People

Contributors

akarnokd avatar bryant1410 avatar davidmoten avatar dependabot[bot] avatar goznauk avatar igouss avatar jlleitschuh avatar petikoch avatar sparty02 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxjava-extras's Issues

What about proguard?

I have this issues:


com.github.davidmoten.rx.Serialized: can't find referenced class com.esotericsoftware.kryo.Kryo |  
-- | --



com.github.davidmoten.rx.Serialized: can't find referenced class com.esotericsoftware.kryo.Kryo |  
-- | --
com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.Kryo |  
com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.io.Output |  
com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.io.Input |  
com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.Kryo |  
com.github.davidmoten.rx.Serialized$KryoBuilder$1: can't find referenced class com.esotericsoftware.kryo.io.Output |  
com.github.davidmoten.rx.Serialized$KryoBuilder$2: can't find referenced class com.esotericsoftware.kryo.io.Output |  
com.github.davidmoten.rx.Serialized$KryoBuilder$2$1: can't find referenced class com.esotericsoftware.kryo.Kryo |  
com.github.davidmoten.rx.Serialized$KryoBuilder$2$1: can't find referenced class com.esotericsoftware.kryo.io.Output |  
com.github.davidmoten.rx.Serialized$KryoBuilder$3: can't find referenced class com.esotericsoftware.kryo.io.Output |  
com.github.davidmoten.rx.Serialized$KryoBuilder$4: can't find referenced class com.esotericsoftware.kryo.io.Input |  
com.github.davidmoten.rx.Serialized$KryoBuilder$5: can't find referenced class com.esotericsoftware.kryo.io.Input |  
com.github.davidmoten.rx.Serialized$KryoBuilder$6: can't find referenced class com.esotericsoftware.kryo.io.Input |  
com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.io.Input |  
com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.Kryo |  
com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.io.Input |  
com.github.davidmoten.rx.testing.TestSubscriber2: can't find referenced class org.junit.Assert |  
com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite$SuiteClasses |  
com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite |  
com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runner.RunWith |  
com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite |  
com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite$SuiteClasses |  

toListWhile for Stream API

Do you know how to implement the same functionality of toListWhile using java 8 stream api (no rx)?
Thank you.

ThrowbackZip Operator?

Hi David, a while back you answered my SO question and created some very helpful toListWhile() operators for my particular problem.
http://stackoverflow.com/questions/31523884/rxjava-group-emit-and-zip-sorted-chunks-with-a-common-property

Here's a fun little complication... what if a partitionId value only existed in one of the two Observable streams?

The toListWhile() solves 90% of this particular problem. What would be cool is to have some kind of throwbackZip() operator that behaves like a zip(), but performs a given Func2<X,Y,Boolean> test between the two zipped items first. If that test fails, an Action3<X,Y,ZipThrowback> is provided, where ZipThrowback is a class that allows you to substitute a value for X or Y for the zip operation, and you can also have the X or Y repeated so it will ultimately zip with the correct partner.

    Observable<Item> items1 = ...;
    Observable<Items> items2 = ...;

    Observable<List<Item>> itemLists1 = items1.compose(Transformers.toListWhile(item -> 
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

    Observable<List<Item>> itemLists2 = items2.compose(Transformers.toListWhile(item -> 
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

    Observables.throwbackZip(itemLists1, itemsList2, 
        (l1,l2) -> l1.get(0).partitionId == l2.get(0).partitionId,
        (l1,l2,zipThrowback) -> {
            if (l1.get(0).partitionId > l2.get(0).partitionId) { 
                zipThrowback.substituteLeft(Collections.emptyList());
                zipThrowback.resendLeft(l1);
            }
            else { 
              zipThrowback.substituteRight(Collections.emptyList());
              zipThrowback.resendRight(l2);
            }
        }), 
        //standard zip function goes here
        )
        //etc

throwbackzip

What do you think? Is this a niche need or something that be useful?

toCombinedList() operator

Thought I'd offer this particular Transformer since I am using it a lot and maybe others might find value in it. David, if you want this CombinedListTransformer let me know and I can create a PR.

This SO post will provide detailed context on what it does and how it came about.
http://stackoverflow.com/questions/32292509/rxjava-consolidating-multiple-infinite-observablelistt/32312586#32312586

Essentially, it transforms an Observable<List<T>> with a mapping for each T item to another Observable<List<R>>, consolidates each latest emitted List<R> into a single List<R>, and emits that Observable<List<R>> result.

It was tedious enough to compose that a Transformer felt warranted.

    public final class CombinedListTransformer<T,R> implements Observable.Transformer<List<T>,List<R>> {

        private final Func1<T,Observable<List<R>>> listMapper;

        public CombinedListTransformer(Func1<T,Observable<List<R>>> listMapper) {
            this.listMapper = listMapper;
        }
        @Override
        public Observable<List<R>> call(Observable<List<T>> sourceList) {
            return sourceList.flatMap(sl ->
                Observable.<Observable<List<R>>>create(s -> {
                    for (T t : sl) {
                        s.onNext(listMapper.call(t));
                    }
                    s.onCompleted();
                }).toList() //List<Observable<List<R>>
                .flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, args -> {
                    ArrayList<R> list = new ArrayList<>();
                    for (Object obj : args) {
                        list.addAll((List<R>) obj);
                    }
                    return list;
                }))
            );
        }
    }

why use .isEmpty() ??

I don't understand why do I have to use .isEmpty() ? It's not obvious to me, let alone to the code reader.

Observable.just(10, 5, 2, -1, -2, -5, -1, 2, 5, 6)
    .compose(Transformers.toListWhile( 
        (list, t) -> list.isEmpty() 
            || Math.signum(list.get(0)) < 0 && Math.signum(t) < 0
            || Math.signum(list.get(0)) >= 0 && Math.signum(t) >= 0)
    .forEach(System.out::println);

rxkotlin-extras

Hi David,

I am thinking of creating an rxkotlin-extras project that will express rxjava-extras as Kotlin extension functions for the Observable<T>. That way lift() and compose() do not have to be called in Kotlin code.

val source = Observable.create("Alpha","Beta","Gamma")
source.doOnFirst { println("First element is $it") }.subscribe { println(it) }

I wanted your blessing before I embarked on this possibly, or give you first chance to do it if you'd like to own it.

Resource Manager

I have this bit of code that I've used a couple of times and I figured it could use a good home. Its a class for currying the resource creation and clean of the rx.Observable#using function. Would this be a good fit for this project?

import rx.Observable;
import rx.functions.Func1;

public class ResourceManager<T> {
    public static interface CheckedFunc0<R> {
        public R call() throws Exception;
    }

    public static interface CheckedAction1<T> {
        public void call(T t) throws Exception;
    }

    private CheckedFunc0<T> resourceFactory;
    private CheckedAction1<? super T> disposeAction;

    public ResourceManager(final CheckedFunc0<T> resourceFactory, final CheckedAction1<? super T> disposeAction) {
        this.resourceFactory = resourceFactory;
        this.disposeAction = disposeAction;
    }

    /**
     * The resource T is available for use by the function passed in until the {@link Observable} returned is unsubscribed from.
     * 
     * @param func
     * @return
     */
    public final <R> Observable<R> checkout(final Func1<? super T, ? extends Observable<? extends R>> func) {
        return Observable.using(() -> {
            try {
                return resourceFactory.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } , func, (rsrc) -> {
            try {
                disposeAction.call(rsrc);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}

Menu of operators and what they do

From time to time, there is a question on StackOverflow or on RxJava when I think this library has the answer, but it is difficult to point the OP to the right component.

Could you make sure the readme.md is up to date and the listing of features has some short description of the function they achieve?

onBackpressureBufferToFile review please

The ability to buffer streams to disk has been something that I've wondered about for a while.

Can I get peoples comments/review of this new operator please?

Transformers.onBackpressureBufferToFile

I'd love to get review of this new operator in terms of

  • what use cases have you got? (volume, serialized size, rate, platform, constraints)
  • overall approach
  • API
  • correctness of code (a big one because of the numerous sections of code subject to concurrency)
  • testing on different OS (currently just tested on linux)
  • performance (I've favoured correctness initially)
  • anything else you think of

The code is in the master branch and runtime jar is on Maven Central as described in rxjava-extras README.

A quick way of contributing is to run a long running test (~30 mins) on your machine:

git clone https://github.com/davidmoten/rxjava-extras.git
cd rxjava-extras
./test-long.sh 

On non-nix platform just run this command instead of test-long.sh:

mvn clean install -Dmax.small=100000000 -Dmax.medium=300000 -Dmax.seconds=600 -Dloops=10000

Precompile TransformerStringSplit pattern

Hi,

I was looking at TransformerStringSplit and noticed the use of String.split. Wouldn't this compile the pattern for each input String ? Or am I missing something ?
An idea would be to compile the pattern int the TransformerStringSplit.split method the use Pattern.split ?

Thanks

Questions

The functionalities provided by this library seems to be very interesting.

Could you please kindly answer to my following questions?

  1. I feel some of the functionalities of this library are really helpful and are missing from rxJava, (e.g., transformersorderedmergewith) I wonder if there is any plan for merging the features to rxJava? Is there any reason this hasn't been merged yet?
  2. Do you have any track of who is currently using this library?
  3. Is there a plan to move this from personal github scope to an apache project?

No terminal event from Transformers.orderedMergeWith(Observable<T>, Comparator) if Comparator throws exception

If Comparator passed to Transformers.orderedMergeWith throws exception, the result observable produces neither more results nor any terminal event. Basically the following code just blocks forever.

How to reproduce:

        try {
            System.out.println("started");
            Observable<Integer> o1 = Observable.range(0, 10000);
            Observable<Integer> o2 = Observable.range(0, 10000);


            Comparator<Integer> comparator = new Comparator<Integer>() {

                AtomicInteger i = new AtomicInteger(0);
                @Override
                public int compare(Integer o1, Integer o2) {
                    if (i.getAndIncrement() > 1000) {
                        throw new RuntimeException("failed");
                    }
                    return o1 - o2;

                }
            };
            Observable<Integer> res = o1.compose(Transformers.orderedMergeWith(o2, comparator));

            System.out.println("finished " + res.toList().toBlocking().single().size());
        } catch (Exception e) {
            e.printStackTrace();
        }

@davidmoten can you please take a look?

java.lang.IllegalStateException: Terminal event already emitted.

I get the following error when trying to merge a few files using

flatMap(list -> OrderedMerge.create(list, comparator));

where list is a List<Observable<java.io.File>> .

on version 0.7.9.24 ( I think our RxJava version is too old for the 0.8.x branch ).

java.lang.IllegalStateException: Terminal event already emitted. at rx.observables.SyncOnSubscribe$SubscriptionProducer.onError(SyncOnSubscribe.java:476) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:65) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.request(Subscriber.java:157) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.requestMore(OrderedMerge.java:312) at com.github.davidmoten.rx.internal.operators.OrderedMerge$MergeProducer.emit(OrderedMerge.java:251) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.onNext(OrderedMerge.java:336) at rx.internal.operators.OperatorCast$CastSubscriber.onNext(OperatorCast.java:69) at rx.observers.Subscribers$5.onNext(Subscribers.java:235) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) at rx.observables.SyncOnSubscribe$SubscriptionProducer.onNext(SyncOnSubscribe.java:490) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:59) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorCast$CastSubscriber.setProducer(OperatorCast.java:94) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:62) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:43) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:94) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:32) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:85) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:27) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastPath(OnSubscribeFromIterable.java:190) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:86) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100) at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73) at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.producers.SingleProducer.request(SingleProducer.java:75) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:110) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:102) at rx.internal.operators.OnSubscribeTakeLastOne$TakeLastOneSubscriber.onCompleted(OnSubscribeTakeLastOne.java:57) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OperatorTake$1.onCompleted(OperatorTake.java:56) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.checkTerminated(OnSubscribeFlattenIterable.java:308) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:180) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onCompleted(OnSubscribeFlattenIterable.java:147) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.checkTerminated(OnSubscribeFlattenIterable.java:308) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:180) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onCompleted(OnSubscribeFlattenIterable.java:147) at rx.internal.operators.OnSubscribeRedo$4$1.onCompleted(OnSubscribeRedo.java:321) at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:239) at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onCompleted(OperatorZip.java:307) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:298) at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:284) at rx.internal.operators.NotificationLite.accept(NotificationLite.java:152) at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:255) at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:161) at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67) at rx.internal.operators.OnSubscribeRedo$2$1.onCompleted(OnSubscribeRedo.java:228) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101) at retrofit.RxSupport$2.run(RxSupport.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

it swallows the original Exception:

java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.fill(BufferedInputStream.java:214) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2303) at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2596) at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2606) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:58) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.request(Subscriber.java:157) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.requestMore(OrderedMerge.java:312) at com.github.davidmoten.rx.internal.operators.OrderedMerge$MergeProducer.emit(OrderedMerge.java:251) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.onNext(OrderedMerge.java:336) at rx.internal.operators.OperatorCast$CastSubscriber.onNext(OperatorCast.java:69) at rx.observers.Subscribers$5.onNext(Subscribers.java:235) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) at rx.observables.SyncOnSubscribe$SubscriptionProducer.onNext(SyncOnSubscribe.java:490) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:59) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorCast$CastSubscriber.setProducer(OperatorCast.java:94) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:62) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:43) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:94) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:32) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:85) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:27) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastPath(OnSubscribeFromIterable.java:190) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:86) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100) at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73) at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.producers.SingleProducer.request(SingleProducer.java:75) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:110) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:102) at rx.internal.operators.OnSubscribeTakeLastOne$TakeLastOneSubscriber.onCompleted(OnSubscribeTakeLastOne.java:57) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at retrofit.RxSupport$2.run(RxSupport.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Did anyone else experience such an issue?

Transformer.orderedMergeWith: Allow configuring Source Observable Backpressure BufferSize

Hi David,

We are using rxjava-extras as a dependency:
https://mvnrepository.com/artifact/com.microsoft.azure/azure-cosmosdb/1.0.0
https://github.com/Azure/azure-cosmosdb-java

We are using Transformer.orderedMergeWith.
It appears to me that in the implementation of BackPressure for the above operator for each of the source observables, RxRingBuffer.SIZE items will be buffered. Am I right?

If not already supported, is that possible to provide a overload for Transformer.orderedMergeWith which allows configuring the buffered size for each source observable? It is critical for our use case.

Thank you @davidmoten

Proguard configuration

Hi David,

Could you help with the proguard configuration:

Warning:com.github.davidmoten.rx.Jaxws$ObservableAdapter: can't find superclass or interface javax.xml.bind.annotation.adapters.XmlAdapter
Warning:com.github.davidmoten.rx.Jaxws$ObservableAdapter: can't find referenced class javax.xml.bind.annotation.adapters.XmlAdapter
Warning:com.github.davidmoten.rx.Serialized: can't find referenced class com.esotericsoftware.kryo.Kryo
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.Kryo
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.io.Output
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.io.Input
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder: can't find referenced class com.esotericsoftware.kryo.Kryo
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$1: can't find referenced class com.esotericsoftware.kryo.io.Output
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$2: can't find referenced class com.esotericsoftware.kryo.io.Output
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$2$1: can't find referenced class com.esotericsoftware.kryo.Kryo
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$2$1: can't find referenced class com.esotericsoftware.kryo.io.Output
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$3: can't find referenced class com.esotericsoftware.kryo.io.Output
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$4: can't find referenced class com.esotericsoftware.kryo.io.Input
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$5: can't find referenced class com.esotericsoftware.kryo.io.Input
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$6: can't find referenced class com.esotericsoftware.kryo.io.Input
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.io.Input
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.Kryo
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.io.Input
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.Kryo
Warning:com.github.davidmoten.rx.Serialized$KryoBuilder$7: can't find referenced class com.esotericsoftware.kryo.io.Input
Warning:com.github.davidmoten.rx.testing.TestSubscriber2: can't find referenced class org.junit.Assert
Warning:com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite$SuiteClasses
Warning:com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite
Warning:com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runner.RunWith
Warning:com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite
Warning:com.github.davidmoten.rx.testing.TestingHelper$TestSuiteFromCases: can't find referenced class org.junit.runners.Suite$SuiteClasses
Warning:there were 75 unresolved references to classes or interfaces.

Thanks in advance.

doOnEmpty implementation

This operator moved to rxjava-extras master branch from ReactiveX/RxJava#3624.

cc/ @thomasnield (author)

  • converted to OnSubscribeDoOnEmpty to save allocations associated with lift
  • removed done checks because of guarantees of observable contract
  • added unit tests, now at 100% coverage

@thomasnield if I've missed something pls let me know, when it looks good to you I'll release to Maven Central.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.