Comments (9)
@tukez can you give more details on how you disconnect the channel ? Ideally a PR with a failing test case.
We have tests on cancellation but maybe some cases are not covered.
from reactive-grpc.
I couldn't produce a failing test case. This is what I tried (notice the Thread.sleep(1000) and the comment):
@Test
public void disconnectionCancelsClientStream() throws Exception {
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
AtomicBoolean requestWasSubscribed = new AtomicBoolean(false);
AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
AtomicBoolean requestDidProduce = new AtomicBoolean(false);
ReplayProcessor<NumberProto.Number> requestProcessor = ReplayProcessor.create(1);
requestProcessor.onNext(protoNum(0));
Flux<NumberProto.Number> request = requestProcessor.doOnNext(n -> {
requestDidProduce.set(true);
System.out.println("P: " + n.getNumber(0));
}).doOnSubscribe(sub -> {
requestWasSubscribed.set(true);
System.out.println("Client stream subscribed");
}).doOnCancel(() -> {
requestWasCanceled.set(true);
System.out.println("Client stream canceled");
});
Flux<NumberProto.Number> observer = stub
.twoWayPressure(request)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()));
// Fails without sleep (as expected: request was not canceled) but works with sleep. Not sure what is happening. Also server.shutdownNow() is not same as killing the server.
Thread.sleep(1000);
server.shutdownNow();
try {
StepVerifier.create(observer).verifyError(StatusRuntimeException.class);
assertThat(requestWasSubscribed.get()).isTrue();
assertThat(requestWasCanceled.get()).isTrue();
assertThat(requestDidProduce.get()).isTrue();
} finally {
stopServer(); // Cleanup after server.shutdownNow()
setupServer(); // For next test
}
}
I made a standalone test server and client to reproduce the bug.
- Start server
- Start client
- Kill server
- Client should print "Client stream canceled" (wont print, that's the bug)
Server:
import java.io.IOException;
import com.google.protobuf.Empty;
import com.salesforce.servicelibs.NumberProto;
import com.salesforce.servicelibs.ReactorNumbersGrpc;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TestServer {
public static void main(String[] args) throws InterruptedException, IOException {
TestService svc = new TestService();
Server server = NettyServerBuilder.forPort(8888).addService(svc).build();
server.start();
server.awaitTermination();
}
private static class TestService extends ReactorNumbersGrpc.NumbersImplBase {
@Override
public Flux<NumberProto.Number> responsePressure(Mono<Empty> request) {
throw new UnsupportedOperationException();
}
@Override
public Mono<NumberProto.Number> requestPressure(Flux<NumberProto.Number> request) {
throw new UnsupportedOperationException();
}
@Override
public Flux<NumberProto.Number> twoWayPressure(Flux<NumberProto.Number> request) {
return request.map(n -> {
System.out.println(n.getNumber(0));
return n.toBuilder().clearNumber().addNumber(n.getNumber(0) + 1).build();
});
}
}
}
Client:
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import com.salesforce.servicelibs.NumberProto;
import com.salesforce.servicelibs.ReactorNumbersGrpc;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.SignalType;
public class TestClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 8888).usePlaintext(true).build();
monitorState(channel, channel.getState(false));
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel).withWaitForReady();
ReplayProcessor<NumberProto.Number> requestProcessor = ReplayProcessor.create(1);
requestProcessor.onNext(protoNum(0));
Flux<NumberProto.Number> request = requestProcessor
.doOnNext(n -> System.out.println("Produced: " + n.getNumber(0)))
.doOnSubscribe(sub -> System.out.println("Client stream subscribed"))
.doOnCancel(() -> System.out.println("Client stream canceled"));
Flux<NumberProto.Number> observer =
Flux.defer(() -> {
return stub.twoWayPressure(request);
//MonoProcessor<SignalType> stopSignal = MonoProcessor.create();
//Flux<NumberProto.Number> stoppableRequest = request.takeUntilOther(stopSignal);
//return stub.twoWayPressure(stoppableRequest).doFinally(stopSignal::onNext);
})
.doOnNext(number -> System.out.println(number))
.doOnError(throwable -> System.out.println(throwable.getMessage()));
observer.retryWhen(errorFlux -> errorFlux.delayElements(Duration.ofSeconds(1))).subscribe();
channel.awaitTermination(1, TimeUnit.HOURS);
}
private static NumberProto.Number protoNum(int i) {
Integer[] ints = {i};
return NumberProto.Number.newBuilder().addAllNumber(Arrays.asList(ints)).build();
}
private static void monitorState(ManagedChannel channel, ConnectivityState state) {
channel.notifyWhenStateChanged(state, () -> {
ConnectivityState newState = channel.getState(true);
System.out.println("Connection state: " + state + " -> " + newState);
monitorState(channel, newState);
});
}
}
from reactive-grpc.
@rmichela do you know what is the behavior of gRPC when the channel disconnects ? Does it emit an error ?
from reactive-grpc.
That really depends on the way that gRPC disconnects.
The only "safe" way to close a stream is explicitly calling onComplete()
, onError()
, or cancel()
. The client and server can also shutdown()
, which sends a GO_AWAY frame to all connected peers initiating a graceful disconnect.
I've seen noisy exceptions in gRPC when the client or server process is unexpectedly killed.
from reactive-grpc.
@tukez Can you tell me more about the behavior you are expecting when a server is gracelessly killed? Is an exception escaping your onError handler?
Reactive-gRPC tries to follow the behavior of gRPC as closely as possible. When a server unexpectedly explodes mid stream, this is indeed an error and should be reported as such, whereas stream cancellation is something the server does by choice. How your client reacts to these situations may be different and there is a need to distinguish between them.
from reactive-grpc.
@rmichela When server is killed, I expect the request flux subscription to be cancelled. By request flux, I mean the flux given as a parameter to the RPC. In this case the error is happening on the server side (if you think in terms of flux subscription/production, the server is the subscriber), so the subscription should be cancelled.
from reactive-grpc.
I've refactored the middle of your client to make the pipeline more clear.
requestProcessor
// Upstream
.doOnNext(n -> System.out.println("Produced: " + n.getNumber(0)))
.doOnSubscribe(sub -> System.out.println("Upstream stream subscribed"))
.doOnCancel(() -> System.out.println("Upstream stream canceled"))
.doOnError(t -> System.out.println("Upstream exception " + t.getMessage()))
// Service Call
.compose(stub::twoWayPressure)
// Downstream
.doOnNext(number -> System.out.println(number))
.doOnCancel(() -> System.out.println("*** Downstream stream canceled"))
.doOnError(throwable -> System.out.println("Downstream exception " + throwable.getMessage()))
.retryWhen(errorFlux -> errorFlux.delayElements(Duration.ofSeconds(1)))
.subscribe();
If I understand your request, when an exception occurs in gRPC, an error should be propagated downstream, but a cancellation should be propagated upstream?
from reactive-grpc.
I've been digging into the Rx specs and you are correct. Errors in the middle of a stream should propagate an onError()
downstream and a cancel()
upstream.
from reactive-grpc.
@tukez I've added upstream cancellation in addition to downstream error propagation. This should fix your reported issue. If you are still having a problem, please re-open the issue.
from reactive-grpc.
Related Issues (20)
- Wrong manifest main class rx3grpc HOT 4
- Backpressure Demo has compile error on Windows
- stub.invoke().retry() throws exception HOT 1
- Async resource cleanup failed after onComplete / call already closed HOT 2
- Add onDiscard hook support to AbstractStreamObserverAndPublisher
- host_javabase is deprecated HOT 1
- Possible to have non-publisher for RPC method parameter? HOT 2
- In ManyToOne and ManyToMany, remote method is called before the client subscribe
- project still active? HOT 1
- UnsupportedOperationException thrown in the generated stub with Java 17 HOT 2
- Unexpedted error log HOT 1
- Dropped Error on Bidi Stream After Broken TCP Connection HOT 2
- Subscription Should be Cancelled on Interceptor Close HOT 3
- How about a new stable release? HOT 14
- Migrate reactive-grpc to grpc-ecosystem HOT 2
- Indeterminate cancel or error when streaming bidirectionally
- Question: how can Spring reactive-grpc support credentials on server HOT 2
- Bazel demo does not compile HOT 1
- How to customize ClientInterceptor and ServerInterceptor HOT 1
- `reactor-grpc-1.2.4-osx-x86_64.exe` is removed from mac on intel HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from reactive-grpc.