Giter Site home page Giter Site logo

Comments (9)

cbornet avatar cbornet commented on May 22, 2024

@tukez can you give more details on how you disconnect the channel ? Ideally a PR with a failing test case.
We have tests on cancellation but maybe some cases are not covered.

from reactive-grpc.

tukez avatar tukez commented on May 22, 2024

I couldn't produce a failing test case. This is what I tried (notice the Thread.sleep(1000) and the comment):

@Test
public void disconnectionCancelsClientStream() throws Exception {
    ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
    
    AtomicBoolean requestWasSubscribed = new AtomicBoolean(false);
    AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
    AtomicBoolean requestDidProduce = new AtomicBoolean(false);
    
    ReplayProcessor<NumberProto.Number> requestProcessor = ReplayProcessor.create(1);
    requestProcessor.onNext(protoNum(0));
    
    Flux<NumberProto.Number> request = requestProcessor.doOnNext(n -> {
        requestDidProduce.set(true);
        System.out.println("P: " + n.getNumber(0));
    }).doOnSubscribe(sub -> {
        requestWasSubscribed.set(true);
        System.out.println("Client stream subscribed");
    }).doOnCancel(() -> {
        requestWasCanceled.set(true);
        System.out.println("Client stream canceled");
    });
    
    Flux<NumberProto.Number> observer = stub
            .twoWayPressure(request)
            .doOnNext(number -> System.out.println(number.getNumber(0)))
            .doOnError(throwable -> System.out.println(throwable.getMessage()));
    
    // Fails without sleep (as expected: request was not canceled) but works with sleep. Not sure what is happening. Also server.shutdownNow() is not same as killing the server.
    Thread.sleep(1000);
    server.shutdownNow();
    try {
        StepVerifier.create(observer).verifyError(StatusRuntimeException.class);
        assertThat(requestWasSubscribed.get()).isTrue();
        assertThat(requestWasCanceled.get()).isTrue();
        assertThat(requestDidProduce.get()).isTrue();
    } finally {
        stopServer(); // Cleanup after server.shutdownNow()
        setupServer(); // For next test
    }
}

I made a standalone test server and client to reproduce the bug.

  1. Start server
  2. Start client
  3. Kill server
  4. Client should print "Client stream canceled" (wont print, that's the bug)

Server:

import java.io.IOException;

import com.google.protobuf.Empty;
import com.salesforce.servicelibs.NumberProto;
import com.salesforce.servicelibs.ReactorNumbersGrpc;

import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TestServer {

    public static void main(String[] args) throws InterruptedException, IOException {
        TestService svc = new TestService();
        Server server = NettyServerBuilder.forPort(8888).addService(svc).build();
        server.start();
        server.awaitTermination();
    }
    
    private static class TestService extends ReactorNumbersGrpc.NumbersImplBase {

        @Override
        public Flux<NumberProto.Number> responsePressure(Mono<Empty> request) {
            throw new UnsupportedOperationException();
        }

        @Override
        public Mono<NumberProto.Number> requestPressure(Flux<NumberProto.Number> request) {
            throw new UnsupportedOperationException();
        }

        @Override
        public Flux<NumberProto.Number> twoWayPressure(Flux<NumberProto.Number> request) {
            return request.map(n -> {
                System.out.println(n.getNumber(0));
                return n.toBuilder().clearNumber().addNumber(n.getNumber(0) + 1).build();
            });
        }
    }
    
}

Client:

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import com.salesforce.servicelibs.NumberProto;
import com.salesforce.servicelibs.ReactorNumbersGrpc;

import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.SignalType;

public class TestClient {

    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 8888).usePlaintext(true).build();
        monitorState(channel, channel.getState(false));
        
        ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel).withWaitForReady();
        
        ReplayProcessor<NumberProto.Number> requestProcessor = ReplayProcessor.create(1);
        requestProcessor.onNext(protoNum(0));
        
        Flux<NumberProto.Number> request = requestProcessor
                .doOnNext(n -> System.out.println("Produced: " + n.getNumber(0)))
                .doOnSubscribe(sub -> System.out.println("Client stream subscribed"))
                .doOnCancel(() -> System.out.println("Client stream canceled"));
        
        Flux<NumberProto.Number> observer = 
                Flux.defer(() -> {
                    return stub.twoWayPressure(request);
                    //MonoProcessor<SignalType> stopSignal = MonoProcessor.create();
                    //Flux<NumberProto.Number> stoppableRequest = request.takeUntilOther(stopSignal);
                    //return stub.twoWayPressure(stoppableRequest).doFinally(stopSignal::onNext);
                })
                .doOnNext(number -> System.out.println(number))
                .doOnError(throwable -> System.out.println(throwable.getMessage()));
        observer.retryWhen(errorFlux -> errorFlux.delayElements(Duration.ofSeconds(1))).subscribe();
        
        channel.awaitTermination(1, TimeUnit.HOURS);
    }
    
    private static NumberProto.Number protoNum(int i) {
        Integer[] ints = {i};
        return NumberProto.Number.newBuilder().addAllNumber(Arrays.asList(ints)).build();
    }
    
    private static void monitorState(ManagedChannel channel, ConnectivityState state) {
        channel.notifyWhenStateChanged(state, () -> {
            ConnectivityState newState = channel.getState(true);
            System.out.println("Connection state: " + state + " -> " + newState);
            monitorState(channel, newState);
        });
    }
    
}

from reactive-grpc.

cbornet avatar cbornet commented on May 22, 2024

@rmichela do you know what is the behavior of gRPC when the channel disconnects ? Does it emit an error ?

from reactive-grpc.

rmichela avatar rmichela commented on May 22, 2024

That really depends on the way that gRPC disconnects.

The only "safe" way to close a stream is explicitly calling onComplete(), onError(), or cancel(). The client and server can also shutdown(), which sends a GO_AWAY frame to all connected peers initiating a graceful disconnect.

I've seen noisy exceptions in gRPC when the client or server process is unexpectedly killed.

from reactive-grpc.

rmichela avatar rmichela commented on May 22, 2024

@tukez Can you tell me more about the behavior you are expecting when a server is gracelessly killed? Is an exception escaping your onError handler?

Reactive-gRPC tries to follow the behavior of gRPC as closely as possible. When a server unexpectedly explodes mid stream, this is indeed an error and should be reported as such, whereas stream cancellation is something the server does by choice. How your client reacts to these situations may be different and there is a need to distinguish between them.

from reactive-grpc.

tukez avatar tukez commented on May 22, 2024

@rmichela When server is killed, I expect the request flux subscription to be cancelled. By request flux, I mean the flux given as a parameter to the RPC. In this case the error is happening on the server side (if you think in terms of flux subscription/production, the server is the subscriber), so the subscription should be cancelled.

from reactive-grpc.

rmichela avatar rmichela commented on May 22, 2024

I've refactored the middle of your client to make the pipeline more clear.

requestProcessor
    // Upstream
    .doOnNext(n -> System.out.println("Produced: " + n.getNumber(0)))
    .doOnSubscribe(sub -> System.out.println("Upstream stream subscribed"))
    .doOnCancel(() -> System.out.println("Upstream stream canceled"))
    .doOnError(t -> System.out.println("Upstream exception " + t.getMessage()))
    // Service Call
    .compose(stub::twoWayPressure)
    // Downstream
    .doOnNext(number -> System.out.println(number))
    .doOnCancel(() -> System.out.println("*** Downstream stream canceled"))
    .doOnError(throwable -> System.out.println("Downstream exception " + throwable.getMessage()))
    .retryWhen(errorFlux -> errorFlux.delayElements(Duration.ofSeconds(1)))
    .subscribe();

If I understand your request, when an exception occurs in gRPC, an error should be propagated downstream, but a cancellation should be propagated upstream?

from reactive-grpc.

rmichela avatar rmichela commented on May 22, 2024

I've been digging into the Rx specs and you are correct. Errors in the middle of a stream should propagate an onError() downstream and a cancel() upstream.

from reactive-grpc.

rmichela avatar rmichela commented on May 22, 2024

@tukez I've added upstream cancellation in addition to downstream error propagation. This should fix your reported issue. If you are still having a problem, please re-open the issue.

from reactive-grpc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.