Giter Site home page Giter Site logo

salesforce / reactive-grpc Goto Github PK

View Code? Open in Web Editor NEW
814.0 36.0 114.0 1.77 MB

Reactive stubs for gRPC

License: BSD 3-Clause "New" or "Revised" License

Java 95.11% C# 0.28% Shell 0.02% Makefile 0.01% Go 0.18% Starlark 1.44% Mustache 2.96%
grpc-java rxjava2 reactor

reactive-grpc's Introduction

CircleCI

What is reactive-grpc?

Reactive gRPC is a suite of libraries for using gRPC with Reactive Streams programming libraries. Using a protocol buffers compiler plugin, Reactive gRPC generates alternative gRPC bindings for each reactive technology. The reactive bindings support unary and streaming operations in both directions. Reactive gRPC also builds on top of gRPC's back-pressure support, to deliver end-to-end back-pressure-based flow control in line with Reactive Streams back-pressure model.

Reactive gRPC supports the following reactive programming models:

Akka gRPC is now mature and production ready. Use that for Akka-based services.

Usage

See the readme in each technology-specific sub-directory for usage details.

Demos

Android support

Reactive gRPC supports Android to the same level of the underlying reactive technologies.

  • Rx-Java - Generated code targets Java 8, so it should work with Android.
  • Spring Reactor - Not officially supported. "Reactor 3 does not officially support or target Android, however, it should work fine with Android SDK 26 (Android O) and above."

Back-pressure

Reactive gRPC stubs support bi-directional streaming with back-pressure. Under the hood, Reactive gRPC is built atop the vanilla gRPC service stubs generated by protoc. As such, they inherit gRPC's HTTP/2-based back-pressure model.

Internally, gRPC and Reactive gRPC implement a pull-based back-pressure strategy. At the HTTP/2 layer, gRPC maintains a buffer of serialized protocol buffer messages. As frames are consumed on the consumer side, the producer is signaled to transmit more frames. If this producer-side transmit buffer fills, the HTTP/2 layer signals to the gRPC messaging layer to stop producing new messages in the stream. Reactive gRPC handles this signal, applying back-pressure to Reactive Streams using the Publisher api. Reactive gRPC also implements Publisher back-pressure on the consumer side of a stream. As messages are consumed by the consumer-side Publisher, signals are sent down through gRPC and HTTP/2 to request more data.

An example of back-pressure in action can be found in BackpressureIntegrationTest.java.

backpressure

Understanding Reactive gRPC Flow Control

For simple unary request and response services, Reactive gRPC's flow control model is transparent. However, Reactive gRPC is built on top of three different interacting flow control models, and, as a result, backpressure doesn't always behave exactly as you would expect. For streaming services, flow control isn't always intuitive, especially when infinite streams are involved.

  • At the bottom is the HTTP/2's byte-based flow control. HTTP/2 works on streams of bytes and is completely unaware of gRPC messages or reactive streams. By default, the stream consumer allocates a budget of 65536 bytes. The stream producer can send up to this many bytes before backpressure engages. As the consumer reads bytes, WINDOW_UPDATE messages are sent to the producer to increase its send budget.

  • In the middle is the gRPC-Java message-based flow control. gRPC's flow control adapts the stream-based flow control of HTTP/2 to a message-based flow control model. Importantly, gRPC's flow control is aware of how it interacts with HTTP/2 and the network.

    On producing side, an on-ready handler reads a message, serializes it into bytes using protobuf, and then queues it up for transmission over the HTTP/2 byte stream. If there is insuficient room in the HTTP/2 flow control window to transmit, backpressure engages an no more messages are requested from the producer until space becomes available.

    On the consuming side, each time a consumer calls request(x), gRPC attempts to read and deserialize x messages from the HTTP/2 stream. Since the size of a protobuf encoded message is variable, there is not a one-to-one correlation between pulling messages from gRPC and pulling bytes over HTTP/2.

  • At the top is the Reactive Streams message-based flow control. Reactive Streams' flow control is designed for producing and consuming messages from end to end. Since the producer and consumer are in the same address space, when the consumer calls request(x), the producer creates a message and calls onNext() x times. Reactive Streams flow control assumes all the parts of the chain are linked by method calls. Inserting gRPC and HTTP/2 in the middle of a reactive stream is a bending of the protocol.

When reasoning about flow control with Reactive gRPC, you cannot assume everything works like Reactive Streams. A call to request(1) on the consuming side of the wire will not necessarially result in a request(1) call on the producing side. Zero or more messages may be requested from the producer based on the state of the HTTP/2 flow control window, and the serialized size of each protobuf message. Instead, you need to think about how each stage in processing interacts with the stage before and after it.

ReactivegRPCHTTP/2...HTTP/2gRPCReactive

flow control

Exception Handling

Exception handling with Reactive gRPC is a little strange due to the way gRPC deals with errors. Servers that produce an error by calling onError(Throwable) will terminate the call with a StatusRuntimeException. The client will have its onError(Throwable) subscription handler called as expected.

Exceptions going from client to server are a little less predictable. Depending on the timing, gRPC may cancel the request before sending any messages due to an exception in the outbound stream.

Contributing

Found a bug? Think you've got an awesome feature you want to add? We welcome contributions!

Submitting a Contribution

  1. Search for an existing issue. If none exists, create a new issue so that other contributors can keep track of what you are trying to add/fix and offer suggestions (or let you know if there is already an effort in progress). Be sure to clearly state the problem you are trying to solve and an explanation of why you want to use the strategy you're proposing to solve it.
  2. Fork this repository on GitHub and create a branch for your feature.
  3. Clone your fork and branch to your local machine.
  4. Commit changes to your branch.
  5. Push your work up to GitHub.
  6. Submit a pull request so that we can review your changes.

Make sure that you rebase your branch off of master before opening a new pull request. We might also ask you to rebase it if master changes after you open your pull request.

Acceptance Criteria

We love contributions, but it's important that your pull request adhere to the standards that we maintain in this repository.

  • All tests must be passing
  • All code changes require tests
  • All code changes must be consistent with our Checkstyle rules. We use the Google Java Style Guide with a few small alterations.
  • Code should have great inline comments

reactive-grpc's People

Contributors

alexnederlof avatar andreaslarssons avatar bruto1 avatar bsideup avatar cbornet avatar chbatey avatar chijoungso avatar dadadom avatar dependabot[bot] avatar engineerdev avatar gertvdijk avatar hellococooo avatar jgleitz avatar joschi avatar kevmo314 avatar koldat avatar krakowski avatar linux-china avatar lobanovdmitry avatar matgabriel avatar mjduijn avatar nikolay-pshenichny avatar olegdokuka avatar rmichela avatar scottslewis avatar snyk-bot avatar svc-scm avatar tukez avatar zetten avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactive-grpc's Issues

Non-Mono overload for a non-streamable argument

Hi!

Currently, the code generator creates the following signature:

public Flux<...> doSomething(Mono<SomeType> argument) {
}

And when someone calls it, they have no options but:

stub.doSomething(Mono.just(SomeRequest.builder().build()));

Would be nice to generate an overloaded method:

public Flux<...> doSomething(SomeType argument) {
    return doSomething(Mono.just(argument));
}

I know that Mono is evaluated lazily (as any other reactive type) but sometimes it's not necessary

Backpressure

Hi!

Are there any well-known backpressure issues? I get backpressure-related exceptions in my app (based on Reactor) and I have a feeling that they come from reactive-grpc.

I know that there is no "request demand" notion in gRPC and you have to deal with readiness, this is why I expect issues with backpressure by nature.

NetworkOnMainThreadException on two way stream.

Hello, I'm sporadically having a NetworkOnMainThreadException when pushing an action on a twoway stream. The action is pushed through a subject, code below.

public class GrpcApiImpl implements GrpcApi {

    private final BehaviorProcessor<Session.Action> stringAsyncProcessor;
    private RxSessionServiceGrpc.RxSessionServiceStub stub;

    public GrpcApiImpl(RxSessionServiceGrpc.RxSessionServiceStub stub) {
        this.stub = stub;
        stringAsyncProcessor = BehaviorProcessor.create();
    }

    @Override
    public Flowable<Session.Command> listenForCommands()  {
        return stub.startSession(stringAsyncProcessor.onBackpressureBuffer());

    }

    @Override
    public void postAction(Session.Action action){
        stringAsyncProcessor.onNext(action);
    }
}

Where is the thread in which the action is fired defined?

Perf testing

We need some performance testing to get a handle on the performance impact of reactive grpc.

Gradle Plugin fails when importing protobufs with underscore in filename

I have the following protobuf definitions in src/main/proto:

some_parameter.proto:

syntax = "proto3";

package my.some_parameters;

message SomeParameter {
    string id = 1;
}

my_service.proto:

syntax = "proto3";

package my.some_service;

import "some_parameter.proto";

service MyService {
    rpc DoStuff (my.some_parameters.SomeParameter) returns (my.some_parameters.SomeParameter);
}

gradle.build:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "com.google.protobuf:protobuf-gradle-plugin:0.8.3"
    }
}

apply plugin: 'java'
apply plugin: 'com.google.protobuf'

repositories {
    mavenCentral()
}

dependencies {
    compile "com.google.protobuf:protobuf-java:3.4.0"
    compile "io.grpc:grpc-all:1.8.0"
    compile 'com.salesforce.servicelibs:reactor-grpc-stub:0.7.1'
}

protobuf {
    protoc {
        // Download from repositories
        artifact = "com.google.protobuf:protoc:3.4.0"
    }
    plugins {
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:1.8.0"
        }
        reactor {
            artifact = "com.salesforce.servicelibs:reactor-grpc:0.7.1:jdk8@jar"
        }
    }
    generateProtoTasks {
        all().each { task ->
            task.plugins {
                grpc {}
                reactor {} // including this fails with:
                // /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:28: error: package my.some_parameters.SomeParameter does not exist
                // public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> reactorRequest) {
                //                                                                                                                                                        ^
                }
        }
    }
}

Gradle error message:

gradle clean build
:clean
:extractIncludeProto
:extractProto UP-TO-DATE
:generateProto
:compileJava
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:28: error: package my.some_parameters.SomeParameter does not exist
        public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> reactorRequest) {
                                                                                                                                                               ^
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:28: error: package my.some_parameters.SomeParameter does not exist
        public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> reactorRequest) {
                                                                           ^
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:36: error: package my.some_parameters.SomeParameter does not exist
        public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> request) {
                                                                                                                                                               ^
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:36: error: package my.some_parameters.SomeParameter does not exist
        public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> request) {
                                                                           ^
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:46: error: package my.some_parameters.SomeParameter does not exist
                                            my.some_parameters.SomeParameter.SomeParameter,
                                                                            ^
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:47: error: package my.some_parameters.SomeParameter does not exist
                                            my.some_parameters.SomeParameter.SomeParameter>(
                                                                            ^
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:73: error: package my.some_parameters.SomeParameter does not exist
                    com.salesforce.reactorgrpc.stub.ServerCalls.oneToOne((my.some_parameters.SomeParameter.SomeParameter) request,
                                                                                                          ^
/Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:74: error: package my.some_parameters.SomeParameter does not exist
                            (io.grpc.stub.StreamObserver<my.some_parameters.SomeParameter.SomeParameter>) responseObserver,
                                                                                         ^
Note: /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
8 errors
:compileJava FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

Total time: 1.472 secs

If I rename some_parameter.proto to someparameter.proto and change the imports accordingly, compilation and Reactor class generation works.

It also works if I add the following to some_parameter.proto:

option java_outer_classname = "SomeParameterProto";

As it works without the reactor plugin, I would consider this a bug.

Request flux subscription is not canceled when channel disconnects.

Request flux subscription will not be canceled when channel disconnects.

Flux<Object> requestFlux = ... // Infinite stream
Flux<Object> responseFlux = serviceStub.manyToMany(requestFlux);
Disposable subscription = responseFlux.subscribe();
// subscription.cancel(); will cancel the subscription to requestFlux, but for example channel disconnecting will not.

I can avoid the problem by wrapping the request flux, but this behaviour seems like a bug to me.

Workaround:

Flux<Object> requestFlux = ...
MonoProcessor<SignalType> stopSignal = MonoProcessor.create();
Flux<Object> stoppableRequestFlux = requestFlux.takeUntilOther(stopSignal);
Flux<Object> responseFlux = serviceStub.manyToMany(stoppableRequestFlux).doFinally(stopSignal::onNext);

Release 0.7.0

I think we are ready to release 0.7.0.

@cbornet Do you agree, or is there more you'd like to see added?

Java ➜ GoLang Interop Testing

Write a gRPC server in GoLang and use Reactive gRPC to call it. Make sure the following work correctly.

  • One-to-one
  • One-to-many
  • Many-to-one
  • Many-to-many

NPE at ReactiveStreamObserverPublisherSubscriptionBase.request

Hi! It looks like there is a race condition under a heavy load, we just got this NPE:

java.lang.NullPointerException: null
at com.salesforce.reactivegrpc.common.ReactiveStreamObserverPublisherBase$ReactiveStreamObserverPublisherSubscriptionBase.request(ReactiveStreamObserverPublisherBase.java:82)
at com.salesforce.reactivegrpc.common.ReactiveBackpressureChunker$1.maybeRequestMore(ReactiveBackpressureChunker.java:91)
at com.salesforce.reactivegrpc.common.ReactiveBackpressureChunker$1.access$100(ReactiveBackpressureChunker.java:41)
at com.salesforce.reactivegrpc.common.ReactiveBackpressureChunker$1$1.request(ReactiveBackpressureChunker.java:59)
at reactor.core.publisher.FluxSwitchMap$SwitchMapInner.requestOne(FluxSwitchMap.java:526)
at reactor.core.publisher.FluxSwitchMap$SwitchMapMain.drain(FluxSwitchMap.java:349)
at reactor.core.publisher.FluxSwitchMap$SwitchMapMain.request(FluxSwitchMap.java:272)
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:179)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.poll(FluxPublishOn.java:562)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:350)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:289)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:873)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1478)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:147)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:190)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:240)
at reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionMainSubscriber.onComplete(FluxDelaySubscription.java:189)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:136)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onComplete(FluxRetryWhen.java:167)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:136)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onComplete(FluxTimeout.java:227)
at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:67)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
at reactor.core.publisher.MonoCompletionStage.subscribe(MonoCompletionStage.java:58)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.MonoTimeout.subscribe(MonoTimeout.java:84)
at reactor.core.publisher.MonoLog.subscribe(MonoLog.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:3589)
at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:85)
at reactor.core.publisher.MonoRetryWhen.subscribe(MonoRetryWhen.java:50)
at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:49)
at reactor.core.publisher.MonoDelaySubscription.accept(MonoDelaySubscription.java:32)
at reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionOtherSubscriber.onNext(FluxDelaySubscription.java:122)
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onNext(FluxRetryPredicate.java:81)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:173)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:113)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:810)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:894)
at reactor.core.publisher.ReplayProcessor.onNext(ReplayProcessor.java:442)
at services.vivy.crm.config.LiiklusConfiguration.lambda$mainLoop$4(LiiklusConfiguration.java:157)
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:124)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:742)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:863)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:957)
at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:136)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onComplete(FluxRetryWhen.java:167)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
at com.salesforce.reactorgrpc.stub.SubscribeOnlyOnceLifter$1.onComplete(SubscribeOnlyOnceLifter.java:51)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:148)
at com.salesforce.reactorgrpc.stub.ClientCalls$1.onNext(ClientCalls.java:43)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407)
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializeReentrantCallsDirectExecutor.execute(SerializeReentrantCallsDirectExecutor.java:49)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:536)
at io.grpc.internal.ForwardingClientStreamListener.messagesAvailable(ForwardingClientStreamListener.java:44)
at io.grpc.internal.AbstractStream$TransportState.messagesAvailable(AbstractStream.java:165)
at io.grpc.internal.MessageDeframer.processBody(MessageDeframer.java:408)
at io.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:271)
at io.grpc.internal.MessageDeframer.deframe(MessageDeframer.java:177)
at io.grpc.internal.AbstractStream$TransportState.deframe(AbstractStream.java:193)
at io.grpc.internal.AbstractClientStream$TransportState.inboundDataReceived(AbstractClientStream.java:356)
at io.grpc.internal.Http2ClientStreamTransportState.transportDataReceived(Http2ClientStreamTransportState.java:147)
at io.grpc.netty.NettyClientStream$TransportState.transportDataReceived(NettyClientStream.java:306)
at io.grpc.netty.NettyClientHandler.onDataRead(NettyClientHandler.java:339)
at io.grpc.netty.NettyClientHandler.access$1000(NettyClientHandler.java:83)
at io.grpc.netty.NettyClientHandler$FrameListener.onDataRead(NettyClientHandler.java:722)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:236)
at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:421)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:118)
at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:390)
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:450)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:628)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:482)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:844)

reactive-grpc version 0.9.0

mvn clean install gives error

I cloned the repo and ran mvn clean install on a MacBook Pro Java 1.8 and got the following test failure. mvn clean install -Dmaven.test.skip=true finishes correctly.

Failed tests:   servicesCanCallOtherServices(com.salesforce.reactorgrpc.ChainedCallIntegrationTest): VerifySubscriber timed out on reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber@4d79d187

Tests run: 44, Failures: 1, Errors: 0, Skipped: 0

Request timeout

How to set timeout for requests?
Grpc has method for that - withDeadlineAfter(). but I can't apply it with your library.

Make *ImplBase classes stand-alone

Right now, the reactive *ImplBase classes are subclasses of the standard gRPC *ImplBase classes. This was done initially for expediency because gRPC *ImplBase class already implements BindableService.bindService(). The downside is that the reactive *ImplBase class has both reactive operations and gRPC StreamObserver operations, which is confusing.

Now that the project is maturing, the generated *ImplBases should correctly implement BindableService.bindService() themselves, rather than relying on gRPC's code.

Lambda expressions break Android

Could u explain? how to add u library to Android Studio?

All classes have generated. But after call stub I have a crash.
crash
Multidex is enabled. MinSdk 21.
grpc

BackpressureIntegrationTest failing

I'm running this on macOS Sierra 10.12.6 MacBookPro, 2.5GH Intel Core 17 16GB. It passes if I set madMultipleCutoff = 1. Anything higher will fail at least one of the tests.

Receive UNKNOWN: Exception was thrown by handler

Version lib

Version: 0.8.2

I'm using BI stream on Android.
Channel:
OkHttpChannelBuilder.forAddress(host, port)
.intercept(metadataClientInterceptor, grpcLoggingInterceptor)
.build()
But after get first bunch of values, I get UNKNOWN: Exception was thrown by handler

My ios colleagues have same problem. But they have solved it by using:
https://github.com/grpc/grpc/blob/master/src/objective-c/RxLibrary/GRXBufferedPipe.m

What am I doing wrong?


io.grpc.StatusRuntimeException: UNKNOWN: Exception was thrown by handler.
        at io.grpc.Status.asRuntimeException(Status.java:526)
        at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:419)
        at com.trading.interactor.common.grpc.GrpcLoggingInterceptor$interceptCall$1$start$1.onClose(GrpcLoggingInterceptor.kt:51)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:684)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:391)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:471)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:553)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:474)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:591)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
        at java.lang.Thread.run(Thread.java:761)

Test required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue random failure

This test required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue from the TCK randomly fails.

Stacktrace :

required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(com.salesforce.rxgrpc.tck.RxGrpcPublisherManyToManyVerificationTest)  Time elapsed: 0.506 sec  <<< FAILURE!
java.lang.AssertionError: Async error during test execution: Queue is full?!
	at org.testng.Assert.fail(Assert.java:78)
	at org.reactivestreams.tck.TestEnvironment.verifyNoAsyncErrorsNoDelay(TestEnvironment.java:314)
	at org.reactivestreams.tck.TestEnvironment.verifyNoAsyncErrors(TestEnvironment.java:298)
	at org.reactivestreams.tck.TestEnvironment.verifyNoAsyncErrors(TestEnvironment.java:282)
	at org.reactivestreams.tck.PublisherVerification$30.run(PublisherVerification.java:1106)
	at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1138)
	at org.reactivestreams.tck.PublisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(PublisherVerification.java:1076)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:74)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:673)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:846)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1170)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
	at org.testng.TestRunner.runWorkers(TestRunner.java:1147)
	at org.testng.TestRunner.privateRun(TestRunner.java:749)
	at org.testng.TestRunner.run(TestRunner.java:600)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:317)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:312)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:274)
	at org.testng.SuiteRunner.run(SuiteRunner.java:223)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1039)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:964)
	at org.testng.TestNG.run(TestNG.java:900)
	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:77)
	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeMulti(TestNGDirectoryTestSuite.java:159)
	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:99)
	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:106)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
	at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
	at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
Caused by: io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:114)
	at com.salesforce.reactivegrpccommon.ReactiveStreamObserverPublisher.onNext(ReactiveStreamObserverPublisher.java:112)
	at com.salesforce.reactivegrpccommon.ReactiveConsumerStreamObserver.onNext(ReactiveConsumerStreamObserver.java:51)
	at com.salesforce.reactivegrpccommon.CancellableStreamObserver.onNext(CancellableStreamObserver.java:36)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:372)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:477)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:102)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:114)
	at com.salesforce.reactivegrpccommon.ReactiveStreamObserverPublisher.onNext(ReactiveStreamObserverPublisher.java:112)
	at com.salesforce.reactivegrpccommon.ReactiveConsumerStreamObserver.onNext(ReactiveConsumerStreamObserver.java:51)
	at com.salesforce.reactivegrpccommon.CancellableStreamObserver.onNext(CancellableStreamObserver.java:36)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:372)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:477)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:102)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Error signal not propagated because canceling subscription throws exception

Note: I'm not entirely sure is the problem on grpc, reactive-grpc, or even reactor side. Let's see what you guys think...

Stack trace:

Exception in thread "app-thread" java.lang.IllegalStateException: call was cancelled
	at com.google.common.base.Preconditions.checkState(Preconditions.java:174)
	at io.grpc.internal.ClientCallImpl.halfClose(ClientCallImpl.java:419)
	at io.grpc.ForwardingClientCall.halfClose(ForwardingClientCall.java:47)
	at io.grpc.ForwardingClientCall.halfClose(ForwardingClientCall.java:47)
	at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onCompleted(ClientCalls.java:330)
	at com.salesforce.reactivegrpc.common.ReactivePublisherBackpressureOnReadyHandler.onComplete(ReactivePublisherBackpressureOnReadyHandler.java:131)
	at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
	at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:136)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.onComplete(FluxTakeUntilOther.java:236)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilOtherSubscriber.onComplete(FluxTakeUntilOther.java:112)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilOtherSubscriber.onNext(FluxTakeUntilOther.java:94)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:808)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:892)
	at reactor.core.publisher.ReplayProcessor.onNext(ReplayProcessor.java:436)
	at reactor.core.publisher.MonoProcessor.drainLoop(MonoProcessor.java:504)
	at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:347)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.cancel(FluxDoFinally.java:145)
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.cancel(FluxFlattenIterable.java:271)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:1497)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:1466)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1278)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:1497)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:1466)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1278)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:123)
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:147)
	at reactor.core.publisher.Operators.terminate(Operators.java:642)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:121)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancelMain(FluxTakeUntilOther.java:173)
	at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancel(FluxTakeUntilOther.java:190)
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:147)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:265)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:135)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:389)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:473)
	at reactor.core.scheduler.ExecutorScheduler$ExecutorTrackedRunnable.run(ExecutorScheduler.java:166)

One of my own subscribers throws an exception which is caught in

at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:135)

Before LambdaSubscriber propagates the error downstream, it calls cancel() on the subscription. That cancel() call throws the problematic exception.

I think the interesting part is here:

at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onCompleted(ClientCalls.java:330)
at com.salesforce.reactivegrpc.common.ReactivePublisherBackpressureOnReadyHandler.onComplete(ReactivePublisherBackpressureOnReadyHandler.java:131)

ReactivePublisherBackpressureOnReadyHandler calls CallsStreamObserver.onCompleted() (should not throw exception because the javadoc does not state so?). CallStreamObserver calls ClientCall.halfClose(), which is documented:

void io.grpc.ClientCall.halfClose()

Close the call for request message sending. Incoming response messages are unaffected. This should be called when no more messages will be sent from the client.
Throws:IllegalStateException - if call is already halfClose()d or canceled

I'm not sure should grpc not throw this exception at all or improve the documentation on CallsStreamObserver.onCompleted(). I think reactive-grpc should catch the exception anyway?

memory synchronisation problem

I noticed that there is no memory synchronisation done when calling onNext on CallStreamObserver here. According to the gRPC docs these objects are not thread safe and memory synchronisation is the responsibility of the callers should multiple threads be involved. Is there something I might be missing here?

Depracated API in grpc 1.9.0

The "XxxServiceGrpc.METHOD_YYY" static fields have been deprecated in the recent grpc releases and replaced with methods. The change is explained here:
grpc/grpc-java#1901

To reproduce, update grpc to 1.9.0 and generate stubs for any service. The generated code will produce deprecation warnings during compilation.

Code generator uses proto directory hierarchy as part of the outer class name

In Rx*Grpc.java files, rxgprc code generation creates wrong package paths when referring to generated message classes. Here is an example method. Notice that the qualified path to HeartbeatRequest includes slashes:

public io.reactivex.Single<com.google.protobuf.Empty> heartbeat(io.reactivex.Single<com.example.v1.Com/example/v1/frontend.HeartbeatRequest> rxRequest) {
    return com.salesforce.rxgrpc.stub.ClientCalls.oneToOne(rxRequest, delegateStub::heartbeat);
}

the grpc equivalent:

public void heartbeat(com.example.v1.FrontendOuterClass.HeartbeatRequest request,
    io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
  asyncUnimplementedUnaryCall(getHeartbeatMethod(), responseObserver);
}

The protocol buffers specifications looks like this:

syntax = "proto3";

package com.example.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

service Frontend {
	rpc Heartbeat(HeartbeatRequest) returns (google.protobuf.Empty);
}
message HeartbeatRequest {
	google.protobuf.Timestamp timestamp = 1;
}

I tested the code generation in both a Java and an Android projects with java and javalite for code generation on each. Here are two example build.gradle files:

  • Java project, IntelliJ IDEA, java code generator:
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.3'
    }
}


plugins {
    id "java"
    id "com.google.protobuf" version "0.8.3"
    id "idea"
    id "application"
}

version '1.0'

sourceCompatibility = 1.8
targetCompatibility = 1.8

mainClassName = "Client"

repositories {
    mavenCentral()
}

def grpcVersion = '1.9.0'
def reactiveGrpcVersion = '0.8.0'

dependencies {
    compile 'com.google.protobuf:protobuf-java:3.5.1'
    compile "io.grpc:grpc-okhttp:${grpcVersion}"
    compile "io.grpc:grpc-stub:${grpcVersion}"
    compile "io.grpc:grpc-protobuf:${grpcVersion}"
    compile "com.salesforce.servicelibs:rxgrpc-stub:${reactiveGrpcVersion}"
}

jar {
    manifest {
        attributes 'Main-Class': 'Client'
    }
}

sourceSets {
    main {
        proto {
            srcDir '../kidswatch-watch-appscomm/protocol'
        }
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/javalite'
        }
    }
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.5.1-1"
    }

    plugins {
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
        }
        rxgrpc {
            artifact = "com.salesforce.servicelibs:rxgrpc:${reactiveGrpcVersion}:jdk8@jar"
        }
    }

    generateProtoTasks {
        all().each { task ->
            task.plugins {
                grpc {}
                rxgrpc {}
            }
        }
    }
}
  • Android library module, Android Studio, javalite code generator:
buildscript {
    ext.grpc_version = '1.9.1'
    ext.rxgrpc_version = '0.8.0'

    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.4'
    }
}

apply plugin: 'com.android.library'
apply plugin: 'com.google.protobuf'

android {
    compileSdkVersion 26

    defaultConfig {
        minSdkVersion 19
        targetSdkVersion 26
        versionCode 1
        versionName "1.0"
    }

    sourceSets {
        main {
            proto {
                srcDir '../server-protocol-specification/protocol'
            }
        }
        debug {
            java {
                srcDirs 'build/generated/source/proto/debug/grpc'
                srcDirs 'build/generated/source/proto/debug/rxgrpc'
                srcDirs 'build/generated/source/proto/debug/javalite'
            }
        }
        release {
            java {
                srcDirs 'build/generated/source/proto/release/grpc'
                srcDirs 'build/generated/source/proto/release/rxgrpc'
                srcDirs 'build/generated/source/proto/release/javalite'
            }
        }
    }

    lintOptions {
        lintConfig file("../lint.xml")
    }
}

protobuf {
    protoc {
        artifact = 'com.google.protobuf:protoc:3.5.1-1'
    }
    plugins {
        javalite {
            // The code generator for lite comes as a separate artifact
            artifact = "com.google.protobuf:protoc-gen-javalite:3.0.0"
        }
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
        }
        rxgrpc {
            artifact = "com.salesforce.servicelibs:rxgrpc:$rxgrpc_version:jdk8@jar"
        }
        sourceSet
    }
    generateProtoTasks {
        all().each { task ->
            task.builtins {
                // In most cases you don't need the full Java output if you use the lite output.
                remove java
            }
            task.plugins {
                javalite {}
                grpc {
                    // Options added to --grpc_out
                    option 'lite'
                }
                rxgrpc {
                }
            }
        }
    }
}

dependencies {
    // You need to build grpc-java to obtain these libraries below.
    api "io.grpc:grpc-okhttp:$grpc_version"
    api "io.grpc:grpc-protobuf-lite:$grpc_version"
    api "io.grpc:grpc-stub:$grpc_version"
    api 'javax.annotation:javax.annotation-api:1.2'
    api "com.salesforce.servicelibs:rxgrpc-stub:$rxgrpc_version"
}

Wrong uppercasing of method names

The method we use:

        public String methodNameUpperUnderscore() {
            return CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_UNDERSCORE, methodName);
        }

doesn't match the one used by grpc-java.

Proto method CreateEntityWithDTO becomes CREATE_ENTITY_WITH_D_T_O instead of CREATE_ENTITY_WITH_DTO

Underscores in RPC name breaks code generation

This proto doesn't generate compiling code.

syntax = "proto3";

package com.example.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

service Settings {
    rpc SettingsGet_Classic_1 (google.protobuf.Empty) returns (Settings_Classic_1);
}

message Settings_Classic_1 {
    google.protobuf.Timestamp timestamp = 1;
}

Generated calls to delegateStub::settingsGet_Classic_1 should be delegateStub::settingsGetClassic1.

Java ➜ C# Interop Testing

Write a gRPC server in C# and use Reactive gRPC to call it. Make sure the following work correctly.

  • One-to-one
  • One-to-many
  • Many-to-one
  • Many-to-many

Cannot add Interceptor to delegate stub

As of now, it doesn't seem possible to manipulate the delegateStub inside the generated ServiceStub.
As an example, I would like to add an Interceptor to log objects passing on the stream. Is there any other way to do so?

Generated stub:

public static final class RxSessionServiceStub {
        private SessionServiceGrpc.SessionServiceStub delegateStub;

        private RxSessionServiceStub(io.grpc.Channel channel) {
            delegateStub = SessionServiceGrpc.newStub(channel);
        }

        /**
         * <pre>
         *  Accepts a stream of Actions and sends back a stream of Comands
         * <pre>
         */
        public io.reactivex.Flowable<session.Session.Command> startSession(io.reactivex.Flowable<session.Session.Action> rxRequest) {
            return com.salesforce.rxgrpc.stub.ClientCalls.manyToMany(rxRequest, delegateStub::startSession);
        }

    }

What I'd like to do:

public static final class RxSessionServiceStub {
        private SessionServiceGrpc.SessionServiceStub delegateStub;

        private RxSessionServiceStub(io.grpc.Channel channel) {
            delegateStub = SessionServiceGrpc.newStub(channel).withInterceptors(interceptor);
        }

        /**
         * <pre>
         *  Accepts a stream of Actions and sends back a stream of Comands
         * <pre>
         */
        public io.reactivex.Flowable<session.Session.Command> startSession(io.reactivex.Flowable<session.Session.Action> rxRequest) {
            return com.salesforce.rxgrpc.stub.ClientCalls.manyToMany(rxRequest, delegateStub::startSession);
        }

    }

Generated code is not compilable in demos/gradle

I checked out the demos/gradle project and tried to build it
I run ./gradlew generateTestProtowhich leads to compile problems:

$ ./gradlew generateTestProto
:extractIncludeProto
:extractProto UP-TO-DATE
:generateProto
:compileJava
.../reactive-grpc/demos/gradle/build/generated/source/proto/main/rxgrpc/com/salesforce/servicelibs/reactivegrpc/RxGreeterGrpc.java:94: error: cannot find symbol
                            com.salesforce.servicelibs.reactivegrpc.GreeterGrpc.getSayHelloMethod(),
                                                                               ^
  symbol:   method getSayHelloMethod()
  location: class GreeterGrpc
.../reactive-grpc/demos/gradle/build/generated/source/proto/main/reactor/com/salesforce/servicelibs/reactivegrpc/ReactorGreeterGrpc.java:82: error: cannot find symbol
                            com.salesforce.servicelibs.reactivegrpc.GreeterGrpc.getSayHelloMethod(),
                                                                               ^
  symbol:   method getSayHelloMethod()
  location: class GreeterGrpc
2 errors
:compileJava FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

I changed the versions to the following, which seemed to work.

def reactiveGrpcVersion = '0.9.0'
def grpcVersion = '1.13.2'
def protobufVersion = '3.5.1'

Ready for production?

Hi, we are considering to use this library in our grpc application. Wanted to know about the current staus and if this is ready to be used in production?

Backpressure is not respected

Backpressure is not respected and the server seems to get additional requests which are not coming from the client.

ERROR 2018-03-20 09:48:01,775 [grpc-default-executor-0] BackpressureClient - server -> client onError
reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:228)
	at com.salesforce.reactivegrpc.common.ReactiveStreamObserverPublisher.onNext(ReactiveStreamObserverPublisher.java:112)
	at com.salesforce.reactivegrpc.common.ReactiveConsumerStreamObserver.onNext(ReactiveConsumerStreamObserver.java:51)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:406)
	at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
	at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:36)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:530)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

This is where the server side requests are coming from (non-thrown exception to print stack trace).

java.lang.Exception: REQ 1
	at backpressure.BackpressureServer$1.lambda$5(BackpressureServer.java:45)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:132)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138)
	at com.salesforce.reactivegrpc.common.ReactivePublisherBackpressureOnReadyHandler.onNext(ReactivePublisherBackpressureOnReadyHandler.java:118)
	at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198)
	at reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:164)
	at backpressure.BackpressureServer$1.lambda$2(BackpressureServer.java:38)
	at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:257)
	at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:199)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
	at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138)
	at com.salesforce.reactivegrpc.common.ReactivePublisherBackpressureOnReadyHandler.run(ReactivePublisherBackpressureOnReadyHandler.java:85)
	at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onReady(ServerCalls.java:191)
	at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:176)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:272)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:653)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

I have a test server & client attached to reproduce this. Start the client with -Dreactor.bufferSize.small=10 to reproduce easier (default is 256 I think). It affects the FluxPublishOn$PublishOnSubscriber buffer size where the exception is thrown from.

backpressure.zip

If you comment out the sub.get().request(1); line from the client side onNext-callback, it's very easy to see that the server gets more requests than the one the client actually requested in onSubscribe-callback.

Best way to retry on a reactive-grpc stream after an error

Hello,

I have a pretty simple use case: a gRPC service with a method taking a stream as argument and returning a stream.

I would like to resubscribe, if for example the remote server goes down.
On a normal stream if would just use retry (with some delay) to resubscribe on error:

Flux<ResultType> resulfFlux = myService.myMethod(sourceStream).retry();

However it is not possible with reactive-grpc:

Exception in thread "grpc-default-executor-0" java.lang.NullPointerException: You cannot directly subscribe to a gRPC service multiple times concurrently. Use Flowable.share() instead.
	at com.salesforce.reactorgrpc.stub.SubscribeOnlyOnceLifter$1.onSubscribe(SubscribeOnlyOnceLifter.java:32)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onSubscribe(FluxPublishOn.java:202)

(btw, this should be Flux.share() instead of Flowable.share())

Do you have any elegant way to achieve this retry on error?

A naive approach with recursion to resume the stream will of course result in a stackoverflow after some time:

Flux<ResultType> getResultStream(Flux<SourceType> sourceStream){
        return myService.myMethod(sourceStream)
                .onErrorResume(err -> Flux.defer(() -> getResultStream(sourceStream)))
                
}

The only partially working approach I found is to chain it with thenMany to avoid a stackoverflow, but it is then not possible to subscribe to the result stream since thenMany waits for the completion of the previous stream...

Flux<ResultType> getResultStream(Flux<SourceType> sourceStream){
        return myService.myMethod(sourceStream)
                 .onErrorResume(err -> Flux.empty())
                 .thenMany(Flux.defer(() -> getResultStream(sourceStream)))    
}

Customize delegateStub

Current implementation just creates a new stub from provided channel , blocking any non hacky option to customize the original stub (enable compression ie).

It would be nice to provide a more complex constructor, a builder, or any other option to allow some full control over the delegateStub.

Kotlin demo

I had some fun converting one of the demo to Kotlin.
I think it would be nice to use Kotlin the Reactor demo since the code generated is perfectly compatible.

@rmichela WDYT ?

Codegeneration is incorrect for nested message types

The following .proto file doesnt generate compiling code (see screenshot below)

syntax = "proto3";

package pkg.test;

service TestService {
    rpc test (InnerMessage.TestRequest) returns (InnerMessage.TestResponse);
}

message InnerMessage {
    message TestRequest {
        string data = 1;
    }

    message TestResponse {
        string data = 1;
        string error = 2;
    }
}

screenshot from 2018-07-04 05-24-43

To make generation work again, it's necessary to move TestRequest and TestResponse out of InnerMessage

Here is my module build.gradle:

apply plugin: 'com.android.application'
apply plugin: 'kotlin-android'
apply plugin: 'kotlin-android-extensions'
apply plugin: 'com.google.protobuf'
apply plugin: 'kotlin-kapt'

android {
    compileSdkVersion 27
    sourceSets {
        main {
            proto {
                srcDir 'src/main/api'
            }
        }
    }
    defaultConfig {
        applicationId "com.example.grpcerror"
        minSdkVersion 22
        targetSdkVersion 27
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
    }
    buildTypes {
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

ext {
    protobufVersion = '3.5.1'
    grpcVersion = '1.12.0'
    reactiveGrpcVersion = '0.8.2'
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:${protobufVersion}"
    }
    clean {
        delete protobuf.generatedFilesBaseDir
    }
    plugins {
        javalite {
            artifact = "com.google.protobuf:protoc-gen-javalite:3.0.0"
        }
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
        }
        rxgrpc {
            artifact = "com.salesforce.servicelibs:rxgrpc:${reactiveGrpcVersion}:jdk8@jar"
        }
    }
    generateProtoTasks {
        all().each { task ->
            task.plugins {
                javalite {}
                grpc {
                    option 'lite'
                }
                rxgrpc {}
            }
        }
    }
}

dependencies {
    implementation fileTree(dir: 'libs', include: ['*.jar'])
    implementation"org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
    implementation 'com.android.support:appcompat-v7:27.1.1'
    testImplementation 'junit:junit:4.12'
    androidTestImplementation 'com.android.support.test:runner:1.0.2'
    androidTestImplementation 'com.android.support.test.espresso:espresso-core:3.0.2'

    implementation "io.grpc:grpc-okhttp:${grpcVersion}"
    implementation "io.grpc:grpc-stub:${grpcVersion}"
    implementation "io.grpc:grpc-protobuf-lite:${grpcVersion}"
    implementation "com.salesforce.servicelibs:rxgrpc-stub:${reactiveGrpcVersion}"
    implementation "javax.annotation:jsr250-api:1.0"
}

Sample project, which demonstrates this issue can be found at https://github.com/AntKos/rx-grpc-sample

Warning on Android Studio 3.1 gradle when generating proto

After upgrading to Android Studio 3.1 and new version of gradle the wearning below occurs:

dependencies:

com.android.tools.build:gradle:3.1.0
com.google.protobuf:protobuf-gradle-plugin:0.8.3

Gradle version: gradle-4.4-all

> Task :app:generateDebugProto Using TaskInputs.file() with something that doesn't resolve to a File object has been deprecated and is scheduled to be removed in Gradle 5.0. Use TaskInputs.files() instead. at org.gradle.api.internal.tasks.DefaultTaskInputs$8.validate(DefaultTaskInputs.java:387) at org.gradle.api.internal.tasks.StaticValue.validate(StaticValue.java:43) at org.gradle.api.internal.tasks.DefaultTaskInputFilePropertySpec.validate(DefaultTaskInputFilePropertySpec.java:124) at org.gradle.api.internal.tasks.DefaultTaskInputs.validate(DefaultTaskInputs.java:171) at org.gradle.api.internal.tasks.execution.ValidatingTaskExecuter.execute(ValidatingTaskExecuter.java:47) at org.gradle.api.internal.tasks.execution.SkipEmptySourceFilesTaskExecuter.execute(SkipEmptySourceFilesTaskExecuter.java:97) at org.gradle.api.internal.tasks.execution.CleanupStaleOutputsExecuter.execute(CleanupStaleOutputsExecuter.java:87) at org.gradle.api.internal.tasks.execution.ResolveTaskArtifactStateTaskExecuter.execute(ResolveTaskArtifactStateTaskExecuter.java:52) at org.gradle.api.internal.tasks.execution.SkipTaskWithNoActionsExecuter.execute(SkipTaskWithNoActionsExecuter.java:52) at org.gradle.api.internal.tasks.execution.SkipOnlyIfTaskExecuter.execute(SkipOnlyIfTaskExecuter.java:54) at org.gradle.api.internal.tasks.execution.ExecuteAtMostOnceTaskExecuter.execute(ExecuteAtMostOnceTaskExecuter.java:43) at org.gradle.api.internal.tasks.execution.CatchExceptionTaskExecuter.execute(CatchExceptionTaskExecuter.java:34) at org.gradle.execution.taskgraph.DefaultTaskGraphExecuter$EventFiringTaskWorker$1.run(DefaultTaskGraphExecuter.java:248) at org.gradle.internal.progress.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:336) at org.gradle.internal.progress.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:328) at org.gradle.internal.progress.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:199) at org.gradle.internal.progress.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:110) at org.gradle.execution.taskgraph.DefaultTaskGraphExecuter$EventFiringTaskWorker.execute(DefaultTaskGraphExecuter.java:241) at org.gradle.execution.taskgraph.DefaultTaskGraphExecuter$EventFiringTaskWorker.execute(DefaultTaskGraphExecuter.java:230) at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor$TaskExecutorWorker.processTask(DefaultTaskPlanExecutor.java:123) at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor$TaskExecutorWorker.access$200(DefaultTaskPlanExecutor.java:79) at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor$TaskExecutorWorker$1.execute(DefaultTaskPlanExecutor.java:104) at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor$TaskExecutorWorker$1.execute(DefaultTaskPlanExecutor.java:98) at org.gradle.execution.taskgraph.DefaultTaskExecutionPlan.execute(DefaultTaskExecutionPlan.java:626) at org.gradle.execution.taskgraph.DefaultTaskExecutionPlan.executeWithTask(DefaultTaskExecutionPlan.java:581) at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor$TaskExecutorWorker.run(DefaultTaskPlanExecutor.java:98) at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor.process(DefaultTaskPlanExecutor.java:59) at org.gradle.execution.taskgraph.DefaultTaskGraphExecuter.execute(DefaultTaskGraphExecuter.java:128) at org.gradle.execution.SelectedTaskExecutionAction.execute(SelectedTaskExecutionAction.java:37) at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:37) at org.gradle.execution.DefaultBuildExecuter.access$000(DefaultBuildExecuter.java:23) at org.gradle.execution.DefaultBuildExecuter$1.proceed(DefaultBuildExecuter.java:43) at org.gradle.execution.DryRunBuildExecutionAction.execute(DryRunBuildExecutionAction.java:46) at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:37) at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:30) at org.gradle.initialization.DefaultGradleLauncher$ExecuteTasks.run(DefaultGradleLauncher.java:314) at org.gradle.internal.progress.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:336) at org.gradle.internal.progress.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:328) at org.gradle.internal.progress.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:199) at org.gradle.internal.progress.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:110) at org.gradle.initialization.DefaultGradleLauncher.runTasks(DefaultGradleLauncher.java:204) at org.gradle.initialization.DefaultGradleLauncher.doBuildStages(DefaultGradleLauncher.java:134) at org.gradle.initialization.DefaultGradleLauncher.executeTasks(DefaultGradleLauncher.java:109) at org.gradle.internal.invocation.GradleBuildController$1.call(GradleBuildController.java:78) at org.gradle.internal.invocation.GradleBuildController$1.call(GradleBuildController.java:75) at org.gradle.internal.work.DefaultWorkerLeaseService.withLocks(DefaultWorkerLeaseService.java:152) at org.gradle.internal.invocation.GradleBuildController.doBuild(GradleBuildController.java:100) at org.gradle.internal.invocation.GradleBuildController.run(GradleBuildController.java:75) at org.gradle.tooling.internal.provider.ExecuteBuildActionRunner.run(ExecuteBuildActionRunner.java:28) at org.gradle.launcher.exec.ChainingBuildActionRunner.run(ChainingBuildActionRunner.java:35) at org.gradle.tooling.internal.provider.ValidatingBuildActionRunner.run(ValidatingBuildActionRunner.java:32) at org.gradle.launcher.exec.RunAsBuildOperationBuildActionRunner$1.run(RunAsBuildOperationBuildActionRunner.java:43) at org.gradle.internal.progress.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:336) at org.gradle.internal.progress.DefaultBuildOperationExecutor$RunnableBuildOperationWorker.execute(DefaultBuildOperationExecutor.java:328) at org.gradle.internal.progress.DefaultBuildOperationExecutor.execute(DefaultBuildOperationExecutor.java:199) at org.gradle.internal.progress.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:110) at org.gradle.launcher.exec.RunAsBuildOperationBuildActionRunner.run(RunAsBuildOperationBuildActionRunner.java:40) at org.gradle.tooling.internal.provider.SubscribableBuildActionRunner.run(SubscribableBuildActionRunner.java:51) at org.gradle.launcher.exec.InProcessBuildActionExecuter.execute(InProcessBuildActionExecuter.java:47) at org.gradle.launcher.exec.InProcessBuildActionExecuter.execute(InProcessBuildActionExecuter.java:30) at org.gradle.launcher.exec.BuildTreeScopeBuildActionExecuter.execute(BuildTreeScopeBuildActionExecuter.java:39) at org.gradle.launcher.exec.BuildTreeScopeBuildActionExecuter.execute(BuildTreeScopeBuildActionExecuter.java:25) at org.gradle.tooling.internal.provider.ContinuousBuildActionExecuter.execute(ContinuousBuildActionExecuter.java:80) at org.gradle.tooling.internal.provider.ContinuousBuildActionExecuter.execute(ContinuousBuildActionExecuter.java:53) at org.gradle.tooling.internal.provider.ServicesSetupBuildActionExecuter.execute(ServicesSetupBuildActionExecuter.java:57) at org.gradle.tooling.internal.provider.ServicesSetupBuildActionExecuter.execute(ServicesSetupBuildActionExecuter.java:32) at org.gradle.tooling.internal.provider.GradleThreadBuildActionExecuter.execute(GradleThreadBuildActionExecuter.java:36) at org.gradle.tooling.internal.provider.GradleThreadBuildActionExecuter.execute(GradleThreadBuildActionExecuter.java:25) at org.gradle.tooling.internal.provider.ParallelismConfigurationBuildActionExecuter.execute(ParallelismConfigurationBuildActionExecuter.java:43) at org.gradle.tooling.internal.provider.ParallelismConfigurationBuildActionExecuter.execute(ParallelismConfigurationBuildActionExecuter.java:29) at org.gradle.tooling.internal.provider.StartParamsValidatingActionExecuter.execute(StartParamsValidatingActionExecuter.java:69) at org.gradle.tooling.internal.provider.StartParamsValidatingActionExecuter.execute(StartParamsValidatingActionExecuter.java:30) at org.gradle.tooling.internal.provider.SessionFailureReportingActionExecuter.execute(SessionFailureReportingActionExecuter.java:59) at org.gradle.tooling.internal.provider.SessionFailureReportingActionExecuter.execute(SessionFailureReportingActionExecuter.java:44) at org.gradle.tooling.internal.provider.SetupLoggingActionExecuter.execute(SetupLoggingActionExecuter.java:45) at org.gradle.tooling.internal.provider.SetupLoggingActionExecuter.execute(SetupLoggingActionExecuter.java:30) at org.gradle.launcher.daemon.server.exec.ExecuteBuild.doBuild(ExecuteBuild.java:67) at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:36) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.WatchForDisconnection.execute(WatchForDisconnection.java:37) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.ResetDeprecationLogger.execute(ResetDeprecationLogger.java:26) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.RequestStopIfSingleUsedDaemon.execute(RequestStopIfSingleUsedDaemon.java:34) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.ForwardClientInput$2.call(ForwardClientInput.java:74) at org.gradle.launcher.daemon.server.exec.ForwardClientInput$2.call(ForwardClientInput.java:72) at org.gradle.util.Swapper.swap(Swapper.java:38) at org.gradle.launcher.daemon.server.exec.ForwardClientInput.execute(ForwardClientInput.java:72) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.LogAndCheckHealth.execute(LogAndCheckHealth.java:55) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.LogToClient.doBuild(LogToClient.java:62) at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:36) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.EstablishBuildEnvironment.doBuild(EstablishBuildEnvironment.java:82) at org.gradle.launcher.daemon.server.exec.BuildCommandOnly.execute(BuildCommandOnly.java:36) at org.gradle.launcher.daemon.server.api.DaemonCommandExecution.proceed(DaemonCommandExecution.java:122) at org.gradle.launcher.daemon.server.exec.StartBuildOrRespondWithBusy$1.run(StartBuildOrRespondWithBusy.java:50) at org.gradle.launcher.daemon.server.DaemonStateCoordinator$1.run(DaemonStateCoordinator.java:295) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) at java.lang.Thread.run(Thread.java:748) A problem was found with the configuration of task ':app:generateDebugProto'. Registering invalid inputs and outputs via TaskInputs and TaskOutputs methods has been deprecated and is scheduled to be removed in Gradle 5.0.

backpressure-demo "queue full?!" exception with concatMap

Instead of using

.zipWith(Flowable.interval(3, TimeUnit.MILLISECONDS), (item, interval) -> item)

to simulate a slow consumer, I tried

.concatMap(item -> Flowable.just(item).delay(3, TimeUnit.MILLISECONDS))

to reproduce the "queue full?!" error [1] we encountered in our project. I can get around the issue using rebatchRequests(1024) before concatMap but I dont understand exactly why concatMap does not work without it.

I'm aware that this issue is maybe not directly related to this project but I would be very grateful for any answers/hints that help me bring some light into the dark.

Cheers, Cédric

Link to demo: https://github.com/salesforce/reactive-grpc/blob/master/demos/backpressure-demo/src/main/java/demo/backpressure/BackpressureController.java#L57
[1] Full stack trace:

java.lang.IllegalStateException: Queue full?!
	at io.reactivex.internal.operators.flowable.FlowableConcatMap$BaseConcatMapSubscriber.onNext(FlowableConcatMap.java:155)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
	at com.salesforce.rxgrpc.stub.SubscribeOnlyOnceFlowableOperator$1.onNext(SubscribeOnlyOnceFlowableOperator.java:41)
	at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
	at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
	at com.salesforce.reactivegrpc.common.ReactiveBackpressureChunker$1.onNext(ReactiveBackpressureChunker.java:71)
	at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
	at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
	at com.salesforce.reactivegrpc.common.ReactiveStreamObserverPublisherBase.onNext(ReactiveStreamObserverPublisherBase.java:106)
	at com.salesforce.reactivegrpc.common.ReactiveStreamObserverPublisherClient.onNext(ReactiveStreamObserverPublisherClient.java:51)
	at com.salesforce.reactivegrpc.common.ReactiveConsumerStreamObserver.onNext(ReactiveConsumerStreamObserver.java:52)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:406)
	at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
	at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:526)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

ReactiveExecutor overrides NettyServerBuilder.executor?

What is the rationale behind calling observerOn(…) in ServerCalls.java? For example here:

.observeOn(Schedulers.from(ReactiveExecutor.getSerializingExecutor()))));

The effect is that even if you provide a custom executor to NettyServerBuilder, the execution gets transferred to the default unbounded thread pool. Is this necessary? Is there a way to override this behavior?

Right now when clients connect to a server, it spawns tens of grpc-default-executor threads. Some of them seem to be used by GRPC's DnsNameResolver, which is OK, as discussed here:
grpc/grpc-java#1400
grpc/grpc-java#3703

But others come from the unbounded pool.

I am still new to the RxJava world but I would expect rxgrpc to be transparent in terms of threading, i.e. leave it to GRPC and the stub implementations.

delegateStub methods return void, but Rx*Grpc expects return type StreamObserver

I'm using this protocol buffers specification:

syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

service Frontend {
	rpc Heartbeat(HeartbeatRequest) returns (google.protobuf.Empty);
}
message HeartbeatRequest {
	google.protobuf.Timestamp timestamp = 1;
}

The generated method looks like this:

public io.reactivex.Single<com.google.protobuf.Empty> heartbeat(io.reactivex.Single<Frontend.HeartbeatRequest> rxRequest) {
    return com.salesforce.rxgrpc.stub.ClientCalls.oneToOne(rxRequest, delegateStub::heartbeat);
}

Besides the issue described in #52 the generated code has an additional error at delegateStub::heartbeat.

The heartbeat method does not return a StreamObserver but instead expects to get one passed as an argument and returns void:

public void heartbeat(FrontendOuterClass.HeartbeatRequest request,
    io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
  asyncUnaryCall(
      getChannel().newCall(getHeartbeatMethod(), getCallOptions()), request, responseObserver);
}

Unary call blocks indefinitely if the server completes empty

Create a unary RPC. On the server, have it immediately call responseObserver.onCompleted() with no previous onNext(). With an io.grpc client call, this RPC throws an INTERNAL error (see grpc/grpc-java#2785) because one-to-one calls must either fail or have a message. With reactor-grpc (0.8.1), the returned Mono neither completes nor fails, causing it to block indefinitely.

Service:

rpc test (google.protobuf.Empty) returns (google.protobuf.UInt64Value);

Server:

public void test(UInt64Value request, StreamObserver<UInt64Value> responseObserver) {
   ServerCalls.oneToOne(request, responseObserver, x->Mono.empty());
}

Client:

Mono<Empty> request = Mono.just(Empty.getDefaultInstance());
Mono<UInt64Value> response = ClientCalls.oneToOne(request stub::test);
System.out.println(response.blockOptional().isPresent());

Workaround: define the RPC as server-streaming (one-to-many) and call flux.singleOrEmpty().blockOptional(), which kind of defeats the purpose of Mono.
(I'm not sure if the one-to-one case should fail or return an empty Mono, but at least it should not block).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.