reactor / reactor Goto Github PK
View Code? Open in Web Editor NEWReactor Bill Of Materials (tracking reactor-core, reactor-netty and more)
Home Page: https://projectreactor.io
License: Apache License 2.0
Reactor Bill Of Materials (tracking reactor-core, reactor-netty and more)
Home Page: https://projectreactor.io
License: Apache License 2.0
The TcpNioSSLConnection
was pretty much a direct port from SI and had to be munged to work with the reactor threading model.
It needs some work to make it cleaner.
The test creates this Composable (System.out.println are mine):
private Composable<Integer> createComposable(Dispatcher dispatcher) {
return new Composable<Integer>(dispatcher)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
System.out.println("mapping: "+integer);
return integer;
}
})
.reduce(new Function<Composable.Reduce<Integer, Integer>, Integer>() {
@Override
public Integer apply(Composable.Reduce<Integer, Integer> r) {
int last = (null != r.getLastValue() ? r.getLastValue() : 1);
System.out.println("reducing: last value: "+r.getLastValue()+", nextValue: "+r.getNextValue());
return last + r.getNextValue();
}
})
.consume(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("consuming: "+integer);
latch.countDown();
}
});
}
When I run the test, only consume method prints to console. Map and Reduce do not do anything.
Not sure what the test it trying to test - just speed of consuming (then kill map and reduce) or cumulative processing as well (then there is a bug somewhere because map and reduce don't produce any output)
It's a little verbose to create a Selector/Object combination now if you want to do explict p2p eventing. You have to do something like:
Object objKey = new Object();
Selector objSel = $(objKey);
reactor.on(objSel, ...);
reactor.notify(objKey, ...);
It would be useful to have a helper function that returned a Tuple2<Selector, Object>
that would generate these things for you so that the above would become:
Tuple2<Selector, Object> obj = Fn.createSelector(); // or called something else
reactor.on(obj.getT1(), ...);
reactor.notify(obj.getT2(), ...);
It would be useful to have a Tuple that could hold arbitrary values of different types for managing APIs that provide interfaces with multiple arguments. If there's no Tuple class, the user would have to create their own classes to hold the type-safe references to different objects.
That's still a possibility, of course, but it would be generally useful to have a Tuple class that allowed holding multiple values and then providing a hierarchy of Tuple1, Tuple2, etc... that provided a generic signature and methods for getting the values of the Tuple in a type-safe way.
Support for Grails Events story
This problem is somewhat similar to #44.
Registration.cancelAfterUse
can be used to cancel a registration after it's been used. However, there's no guarantee of how many times it will be used before it's cancelled. This is due to a possible race between the registration being made (which makes it available for selection) and cancelAfterUse
being called. It may be sufficient to document that cancelAfterUse
cannot be used to guarantee at-most-once use, and perhaps provide an alternative mechanism that does guarantee at-most-once use.
Currently the ReactorFactoryBean
doesn't allow you to set the Dispatcher
to use.
One of the key LMAX RingBuffer features is the events-bulking, we have some use cases especially w.r.t. data ingestion where this would be very interesting to explore.
Currently if a loop fails, we loose it, restarting the loop in that case would be great.
Having a safe guard (like http://gpars.codehaus.org/Agent and http://doc.akka.io/docs/akka/2.1.0/scala/agents.html)
is an interesting pattern for lock-free data write. It seems to me it only needs to tweak schedule a bit (or rename it?).
Doing logging in an async handler can be extremely detrimental to the overall performance of an application. It spends a lot of time trying to lock output streams and the like.
I propose we create our own Dispatcher-based SLF4J logging implementation that will eliminate these problems by having it's own dedicated Dispatcher(s) for doing logging. Then we can make the SLF4J API completely asynchronous and eliminate locking issues when using standard logging tools in asynchronous handlers.
If I do this:
./gradlew build
I get this:
BUILD FAILED
can't we go back to maven?
At least it just works (mostly)
A Consumer
's Registration
is returned when the Consumer
is passed to Reactor.on
. As things stand, if the Consumer
wants to access its own Registration
it cannot do so without being at risk of a race condition: the Consumer
's accept()
method may be called before Reactor.on
has returned so it won't have access to its Registration
.
One solution would be for a Consumer
to optionally implement RegistrationAware
and have the registry pass the Registration
to the Consumer
before it's actually added to the registry - this would ensure that the Registration
is set before accept
can be called.
Inspired by a question/suggestion from Peter Hausel: https://groups.google.com/forum/?fromgroups=#!topic/reactor-framework/qVWGbcPPVEE
A number of types in Reactor allow their configuration to be accessed and modified via getter and setter methods. This mutability complicates thread-safety. Where possible we should move to configuring via constructor injection and private final
fields.
Due to the recent constructor injection changes, it's not very easy to create a Composable that is based on an existing Reactor or that uses a specific Dispatcher implementation.
It would be nice to have a general-purpose builder API for Reactors and things that need Dispatchers so a Builder class that's independent is probably the best long-term way to go.
For the time being, though, I suggest we add a Builder inner class to Composable and have the static from
method return this Builder just to provide a quick and easy alternative to what was previously possible by using the from
method, then calling setDispatcher
.
The current handling of Context
and Dispatcher
lifecycle is a little bit muddled.
The BlockingQueueDispatcher
, RingBufferDispatcher
, and ThreadPoolExecutorDispatcher
all start themselves implicitly in their constructors. BlockingQueueDispatcher
and RingBufferDispatcher
repeat this start processing in their start()
methods, so, if start()
is called, they're relying on the start processing being idempotent. ThreadPoolExecutorDispatcher
does nothing in its start
method.
Context
doesn't start anything implicitly. Its start()
implementation calls start()
on all of its Dispatchers
so this, too, is relying on Dispatcher
start processing being idempotent. The various Dispatchers
that it provides static access to are only started by virtue of their implicit start in their constructors.
The above all feels a bit muddled to me, and it's unclear when a user should call start()
, if at all. At the moment it's not necessary to call start()
which makes the need to call stop()
and destroy()
to clean things up asymmetric.
Since Groovy doesn't support automatic coercion, we need a specific extension for this.
Spring Integration NIO support has an option to use NIO Direct Buffers. This is possible because the the data is copied from the ByteBuffer after the read event is fired and processed and before read events are re-enabled.
With reactor-tcp, the goal was to minimize data copying to the greatest extent, so the read buffer is passed to async decode and read events are immediately reenabled. This means that read events need to use a "new" buffer each time (because the decoding of the previous buffer may not have completed).
It is generally not advised to "churn" direct buffers because the memory allocation can be more expensive than simple heap-based byte buffers.
In order to support direct buffers in reactor-tcp we would likely need some kind of central pool of buffers that can be "borrowed", with a reference count, and returned to the pool when there are no more references.
Since Groovy doesn't support automatic coercion, we need a specific extension for this.
There's lots of logging in the tcp module that's related to the low-level manipulation of bytes, buffers, and the like. Because logging can be so detrimental to an async application's performance, and to reduce the logging overhead in a standard OOTB configuration, we should probably make that logging TRACE level, with only top-level actions logged in DEBUG. Things like connections started and stopped could be DEBUG level, but information about how many bytes are read and whatnot should probably be TRACE level.
Ratpack is a lightweight sinatra-like micro web framework. It does very focused things and does it well.
I think a dedicated module (whereas its a ratpack plugin or a reactor-ratpack artifact) would be very efficent for enabling new async features. It will even make more sense when we will have distribution support especially for people combining ratpack with other heavier backend integrated frameworks, let's say Grails :).
CachingRegistry
currently has two responsibilities:
Selectors
Selectors
that match a particular keyWe should split the two responsibilities into two separate classes.
This is somewhat related to #24
Related to #27, we need to provide a persistence mechanism for events so they don't get lost. When in batch mode (or "paused"), a Reactor should have the capability to store events for later publication when coming out of batch mode (or "resumed").
I think it would be great to have something like JAX-RS ExceptionMappers.
So you could register a type of exception and it would be handled out of the Consumer logic.
Remove the boolean and all references to it.
On the client side, the factory shouldn't be concerned whether a connection is shared or not, it should just hand a new one out when asked. The client should be responsible for reusing and/or closing the connection.
On the server side, socket timeouts can be used to close sockets left open by badly behaved clients.
In Composable, currently:
protected Composable<T> when(Selector sel, final Consumer<T> consumer) {
Consumer<Event<T>> whenConsumer = new Consumer<Event<T>>() {
@Override
public void accept(Event<T> ev) {
consumer.accept(ev.getData());
}
};
if (!isComplete()) {
observable.on(sel, whenConsumer);
}else{
R.schedule(consumer, value, observable);
}
return this;
}
Creation of whenConsumer can be done inside the first if statement.
Either a literal vert.x module or an embedded vert.x inside a Dispatcher implementation...?
There appears to be a race during the startup/initialisation processing of TcpNioServerConnectionFactory
. The build is intermittently failing with output similar to the following:
reactor.tcp.TcpServerReactorTests > tcpServerReactorCanReceiveRequestsNotifyConsumersAndSendResponses STANDARD_ERROR
21-May-2013 05:22:19 Exception in thread "pool-11-thread-1" java.lang.IllegalArgumentException: Factory not initialized
21-May-2013 05:22:19 at reactor.support.Assert.notNull(Assert.java:30)
21-May-2013 05:22:19 at reactor.tcp.TcpNioServerConnectionFactory.run(TcpNioServerConnectionFactory.java:89)
21-May-2013 05:22:19 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
21-May-2013 05:22:19 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
21-May-2013 05:22:19 at java.lang.Thread.run(Thread.java:722)
21-May-2013 05:22:49 Gradle Worker 6 finished executing tests.
21-May-2013 05:22:49
21-May-2013 05:22:49 reactor.tcp.TcpServerReactorTests > tcpServerReactorCanReceiveRequestsNotifyConsumersAndSendResponses FAILED
21-May-2013 05:22:49 java.lang.IllegalStateException: Operation was not complete within 30000ms
21-May-2013 05:22:49 at reactor.tcp.test.TimeoutUtils.doWithTimeout(TimeoutUtils.java:33)
21-May-2013 05:22:49 at reactor.tcp.TcpServerReactorTests.awaitAlive(TcpServerReactorTests.java:106)
21-May-2013 05:22:49 at reactor.tcp.TcpServerReactorTests.tcpServerReactorCanReceiveRequestsNotifyConsumersAndSendResponses(TcpServerReactorTests.java:80)
21-May-2013 05:22:49 Process 'Gradle Worker 6' finished with exit value 0 (state: SUCCEEDED)
I incorrectly used
protected static final reactor.fn.Selector WRITE = $("tcp.write");
...
this.ioReactor.on(WRITE, new Consumer<Event<SelectionKey>> () {
@Override
public void accept(Event<SelectionKey> keyEvent) {
handleWriteSelection(ioSelector, keyEvent.getData());
}
});
...
this.getIoReactor().notify(WRITE, Fn.event(key));
Since the Selector
didn't match, the event was discarded silently; (this previously worked on an older iteration of reactor).
Consider adding DEBUG/TRACE logging to make tracking down these issues easier.
Fix was:
protected static final String WRITE_KEY = "tcp.write";
protected static final reactor.fn.Selector WRITE = $(WRITE_KEY);
...
this.getIoReactor().notify(WRITE_KEY, Fn.event(key));
It's currently not possible to tell from the outside whether a Promise is still pending, whether it has completed successfully, or whether it contains an error.
A reader of the blog post made a very interesting comment about Event ordering [1]. I consider this related to #41 and #27.
I'm comparing Reactor with Akka and Jetlang. I really like the clean Composable/Promise design, and the terseness of Reactor compared especially to Akka since I don't require remote transparency and prefer a concise foundation framework for Java.
One thing that seems missing in Reactor is guarantee on the serialized invocations of Consumers. One answer in the Google group mentions possibility of creating a separate reactor with BlockingQueueDispatcher for such serialized invocations. But in that case, we end up creating bunch of heavyweight threads to have such reactors.
Do you have any plan on introducing such a serialization guarantee? I mean like Fiber in Jetlang getting messages in order, or like actor processing messages in its mailbox one by one.
When the value is null, the Composable is considered to be incomplete. This makes it impossible for the value to actually be null. Instead, we need to use a valueSet boolean, or similar, to track completion.
Might be linked to Configuration/Builder API
Monitoring current dispatched events, and others stats seems to be an important production feature.
For XML-based Spring configuration we should provide an XML namespace for creating Reactors
and Dispatchers
(and anything else we think would benefit from namespace support).
At the moment a single Codec
instance is shared between multiple TcpNioConnection
s. This makes it very hard to write stateful Codec
s. Instead, a Supplier<Codec>
can be used to ensure that each newly created TcpNioConnection
gets a new Codec
instance.
I already have some code from an earlier version of Reactor that uses the Apache Commons httpcomponents core classes for doing HTTP parsing. It's actually quite straightforward and would be an easy way to get HTTP support on top of a raw TCP Reactor.
I also have some custom parsing code that has no dependencies on anything but the JDK. That also might be useful in case the user decided they didn't want to depend on the httpcomponents library. Making the HttpCodec pluggable, then, would allow different parsing implementations to be plugged in, depending on the use case.
If the HTTP parsing classes in Jetty or Tomcat were usable directly, we could also provide implementations that used those classes in case the deployment platform was already one of those servers. I haven't looked to see if this is possible yet, but it would be interesting to find that out.
Looks like the README needs updated to reference the logback module as well as correct the samples after the notify(Object) change.
Promise
current keeps track of its own state, separate to that of its Composable
super class. A side-effect of this is that it's not thread-safe, and success and error handlers may not be called if they're registered while the state is being changed.
A simple configuration API to manage reactors, dispatchers, balancing strategies and others options.
It would be great if we could reuse the existing Buffer class from reactor-core. The reactor-tcp Buffers class is good because it deals with multiple ByteBuffers, but I think it would be better to use a reactor.io.Buffer that wraps a raw ByteBuffer because the Buffer
class has lots of helper methods on it to turn the internal ByteBuffer into various things. to read data as most of the primitive types, and also supports dynamic resizing (though in this case, we're using fixed sizes and zero copy...but when your'e creating these by writing a response, the situation will be different).
It might even be generally useful to have the Buffers class moved to reactor-core, where Buffer already is, so the zero-copy functionality could be used in other things like file access and other byte-centric tasks.
We can provide a default option to attach on(T(Throwable), consumer) to trigger any sl4j error logging as long as you have reactor-logback setup.
This would probably be better combined to a solid Configuration Builder.
Something like @EnableReactors
which creates some default plumbing, with suitable methods to reconfigure the defaults if required.
Be nice to create lazy computation.
We can provide a RetryConsumer that delegates to the desired consumer to retry.
Create a Promise API that provides users an easy way to be notified when deferred values become available.
The JDK 8 time and date APIs use a static method called "of" when creating a new instance from existing data. To better align with JDK 8 in this regard, and because it seems to read better for the Tuple class to use a method called "of" rather than the current "from", I propose we take the advice of twitter user @jodastephen and change Tuple.from to Tuple.of.
CachableRegistration.cancel
removes an entry from registrations
and sets refreshRequired
to true
, but does so without holding the write lock. This isn't thread-safe and, among other things, can result in ConcurrentModificationException
s being thrown when iterating over the registrations.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.