Giter Site home page Giter Site logo

Comments (15)

nocquidant avatar nocquidant commented on August 25, 2024

Any clues on this "bridge" implementation, even a non perfect one I could implement myself? I plan to use Vertx for my next project but I would prefer to use Spring Reactor.

Thanks
--nick

from reactor-netty.

smaldini avatar smaldini commented on August 25, 2024

WebFlux is not an issue here I think, the problem and that's why i moved it for the next version is that you highlight a usage/discoverability issue wrt TcpClient provided. There are other related issues like #82 #24 or #41.

With TcpClient you can create a client as simply as

TcpClient client = TcpClient.create("ip", 123);
//....

        @PostMapping("/person")
	Mono<Void> create(@RequestBody Publisher<ByteBuffer> personStream) {
		return this.client.newHandler((in, out) -> out.sendByteBuffer(personStream));
	}

We plan to improve the tcp client api tho and obviously document it.

from reactor-netty.

nocquidant avatar nocquidant commented on August 25, 2024

Sorry, I don't get it. Actually I want to develop a small middleware with Spring Reactor which connects to a mainframe in TCP/IP, and responds to a java client in HTTP.

... ------> HTTP:8080[JavaClient] ------> HTTP:8888[SpringReactor] ------> TCP/IP:60000[MainFrame]

So, what I try to achieve is (in comments):

@RestController
public class MyController {
	private TcpClient client = TcpClient.create("mainframeip", 60000);

	@PostMapping("/")
	public Flux<byte[]> doPost(@RequestBody Publisher<byte[]> command) throws DecoderException {
		// - First I need to aggegate all bytes coming from the JSON command object
		// because I need to make some checks/transformations before sending it to the mainframe
		// - Then I "wait" for a response from the mainframe
		// first 2 bytes received are the length of the message (the mainframe doesn't close connections), 
		// but I also need to aggregate the response entirely as I have to transform it into JSON
		// - Last I send the generated JSON to the HTTP java client
		// NOTE: the HTTP java client and the mainframe are legacy code, thus they are blocking

		// ?!
		return this.client.newHandler((in, out) -> out.sendByteArray(command));
	}
}

Does it make sense?
Thanks
--nick

from reactor-netty.

elviskim avatar elviskim commented on August 25, 2024

@nocquidant Did you solve this?

from reactor-netty.

nocquidant avatar nocquidant commented on August 25, 2024

Actually we decided to use Vertx for our middleware. Which is quite cool but also quite far from our tech stack...

from reactor-netty.

elviskim avatar elviskim commented on August 25, 2024

@nocquidant Thanks. I have same issue but I have no idea how I can use TcpClient asynchronously.
All examples are blocking(ex. start..).

from reactor-netty.

rstoyanchev avatar rstoyanchev commented on August 25, 2024

@elviskim what are you referring to? What examples are blocking? Did you see Stephane's comment?

from reactor-netty.

creatorKoo avatar creatorKoo commented on August 25, 2024

I really need this feature.
TcpClient used like WebClient.

Currently, I alter developed and solved by add nettyhandler that call ctx.fireChannelInactive() in handler.
(If you want to more information, then reply to question)

BTW, Who's maintain this feature now?

from reactor-netty.

violetagg avatar violetagg commented on August 25, 2024

@creatorKoo please give us more details

from reactor-netty.

creatorKoo avatar creatorKoo commented on August 25, 2024

@creatorKoo please give us more details

public class CustomConnectionHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg).fireChannelInactive();
    }
}

---

Mono<CustomTcpResponse> request(BootStrap tcpInfo, CustomTcpRequest request) {
    return TcpClient.newConnection().connect(tcpInfo).map(conn -> 
        conn.addHandlerLast(new CustomDecoder()).addHandlerLast(new CustomEncoder()).addHandlerLast(new CustomConnectionHandler())
    ).flatMap(conn -> 
        connection.outbound().sendObject(request).then().thenMany(
        connection.inbound().receiveObject().cast(CustomTcpResponse.class)).single()
        .doAfterTerminate(connection::dispose)
    )
    ;
}|

My alter developed tcpclient.
This implement's don't use connection pool. so, I really need to official support this issue.

Is it helpful, @violetagg?

from reactor-netty.

creatorKoo avatar creatorKoo commented on August 25, 2024

And If you use connection pool in Custom TcpClient. And then that doesn't support keepalive and connection management.
So, I need to this feature too. (#898)

from reactor-netty.

violetagg avatar violetagg commented on August 25, 2024

@creatorKoo Having in mind the discussion in #898, do we need something in addition here (except documenting this use case)?

from reactor-netty.

creatorKoo avatar creatorKoo commented on August 25, 2024

@violetagg

yeah. for better solution, need to change something below.

TcpClient must be reused and in place of 'doAfterTerminate' then use 'markPersistent'. (#898 (comment))

I use that code in production environment. and work good for now.

from reactor-netty.

nmina avatar nmina commented on August 25, 2024

I know this is quite old but I'm trying to implement the same concept. Basically an HTTP REST service which needs to connect to a backend server via TCP. I have the following implementation currently but I'm not sure if it's optimal:

Spring Controller:

@RestController
@RequestMapping("/client")
public class ClientController {

  private final SampleTcpClient sampleTcpClient;

  @Inject
  public ClientController(SampleTcpClient sampleTcpClient) {
    this.sampleTcpClient = sampleTcpClient;
  }

  @PostMapping
  Mono<ResponseEntity<Obj>> postValue(@RequestBody ClientRequest request) {
     return this.sampleTcpClient
                .sendRequest(request.getRequestAttr()) // say, a simple string for simplicity.
                .map(str -> mapToObjResponse);

  }
}

SampleTcpClient:

import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;


@Component
public class SampleTcpClient {

  private Connection clientConn;

  private DirectProcessor<byte[]> directProcessor;

  private FluxSink<byte[]> fluxSink;

  // Constructor
  public SampleTcpClient() {
      this.directProcessor = DirectProcessor.create();
      this.fluxSink = directProcessor.sink();
      this.clientConn = TcpClient.create()
                .host("serverHostIPHere")
                .port(1234)
                .handle((inbound, outbound) -> {
                    // setup inbound here.
                    inbound.receive()
                           .asByteArray()
                           .onBackpressureBuffer()
                           .subscribe(bytes -> this.fluxSink.next(bytes));

                    return Mono.never();
                });
  }

  public Mono<String> sendRequest(String requestVal) {
      return Mono.create(sink -> {
          // A sequence number to track which response message goes 
          // to which request message.
          final long sequenceNo = generateSequenceNo();

          // setup DirectProcessor subscriber here.
          final Disposable sub = this.directProcessor
                    .subscribe(bytes -> {
                        if (getSequenceNo(bytes) == sequenceNo) {
                            sink.success(parseBytesToResponseString(bytes));
                        }
                    });

           // dispose the subscriber.
           sink.onDispose(() -> sub.dispose());

          this.clientConn
              .outbound()
              .sendByteArray(Mono.just(requestVal.getBytes())) // imagine the sequenceNo is included in the request.
              .subscribe();
      });

  }
}

My concerns from the code above are:

  1. inbound.receive() can only be executed once. Otherwise it'll throw an error java.lang.IllegalStateException: Only one connection receive subscriber allowed.
  2. Which made me end up experimenting with FluxProcessors like DirectProcessor as seen in the example code. However, as per Project Reactor Processor, it mentions that we should try to avoid using processors if possible.
  3. I am not satisfied with the way I have a subscriber on the DirectProcessor which I then dispose after. Not sure if this is a correct way.

from reactor-netty.

violetagg avatar violetagg commented on August 25, 2024

I updated the documentation with an example with Connection#outbound and Connection#inbound as an alternative of I/O handler #1647
Closing this issue.

from reactor-netty.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.