tower-rs / tower Goto Github PK
View Code? Open in Web Editor NEWasync fn(Request) -> Result<Response, Error>
Home Page: https://docs.rs/tower
License: MIT License
async fn(Request) -> Result<Response, Error>
Home Page: https://docs.rs/tower
License: MIT License
It is common for client libraries to have some kind of Connection
struct,
which have various protocol-related methods, ex. abstract client (let's say, redis-like) can have methods like these:
struct Connection;
impl Connection {
pub fn get(&self, key: String) -> Result<String, ()> { unimplemented!() }
pub fn set(&self, key: String, value: String) -> Result<(), ()> { unimplemented!() }
pub fn delete(&self, key: String) -> Result<(), ()> { unimplemented!() }
}
If that Connection
implements tower::Service
trait, it should provide call
method, ex.
impl Service for Connection {
type Request = MyRequest;
type Response = MyResponse;
type Error = ();
type Future = MyResponseFuture<Item=Self::Response, Error=Self::Error>;
fn call(&mut self, req: Self::Request) -> Self::Future {
unimplemented!()
}
}
Hard part here is that current tokio implementation is not very easy; at least as for me and for a other random people from the internet:
Speaking about Service
again, with a current futures
crate (0.1.20 as for now) it is hard to implement call
method.
For example, if Connection
struct contains Framed<_, _>
in it, it's own sink method can be used to send incoming request to an underline framed object, ex.:
struct Connection {
inner: Framed<TcpStream, MyCodec>
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.inner.send(req);
// and return a result
}
Two caveats appear here:
Self
, which is okay by itself, but became a problem if you still want to keep it in the Connection
;call
should return future that will resolve into MyResponse
later, but it can't be achieved easily; inner Framed<_, _>
, as a stream, should be poll
ed somehow and somewhere and should resolve returned earlier future (MyResponseFuture
in an example above).@benashford have a redis-async-rs crate, which solves second problem via multiple channels: MyResponseFuture
in that case contains oneshot::Receiver<MyResponse>
and Connection
by itself poll
s inner stream and keeping track of the oneshot::Sender<MyResponse>
instances.
While it seems to solve the problem, it looks kinda complicated and might reduce tokio + tower expansion.
Can we elaborate here and provide more complex examples of how clients should be done properly?
AFAIR, there were examples of the pipelined and multiplexed protocols at the tokio repo earlier, maybe we can do something similar?
I'm not sure how exactly can I help here, but I'm willing too, just give me a hint :)
It should be possible to add a connection pool middleware at the service level.
Should Service
be moved into a tower-service
crate and tower
re-exports it along with the various middlewares, stack builder, etc...?
Currently, Service::poll_ready
must return an error type that is the same type as what can be returned from Service::Future
. Sometimes it doesn't make sense, such as if only some errors can occur when asking for readiness, whereas a larger set of errors may occur when executing the Future
.
An original attempt was made here, however there were a few issues (as described in the comments).
It would be nice to have a service that can handle retrying requests. There are a lot of variables that should be considered for a fully general retry mechanism:
Some prior work in the area: https://crates.io/crates/tokio-retry
This issue proposes to change the contract of the poll_service
function on DirectService
The current implementation of DirectService
is as follows:
pub trait DirectService<Request> {
type Response;
type Error;
type Future: Future<Item = Self::Response, Error = Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
fn poll_service(&mut self) -> Poll<(), Self::Error>;
fn poll_close(&mut self) -> Poll<(), Self::Error>;
fn call(&mut self, req: Request) -> Self::Future;
}
This mirrors the Service
trait while including two additional functions: poll_service
and poll_close
.
Currently, poll_service
is documented as:
Returns
Ready
whenever there is no more work to be done untilcall
is invoked again.Note that this method may return
NotReady
even if there are no
outstanding requests, if the service has to perform non-request-driven
operations (e.g., heartbeats).
Most services, including HTTP 1.0 services, need a way to signal graceful close. poll_service
should be updated such that Ready
is returned when the service has gracefully closed without the user requesting a close by calling poll_close
.
Implementations of DirectService
should make a best effort to return Ready
from poll_service
when the service is closed. However, there is no guarantee that poll_service
will return Ready
before a call to poll_ready
returning an error due to the service being closed. It is sometimes impossible to detect the service being closed without attempting to check for readiness.
This also would allow for a class of middleware that are able to "ignore" inner service errors by mapping service errors to a graceful close. This would be useful for middleware like ConnectionPool
, which needs to be able to tell the difference between a fatal inner service error and an inner service gracefully going away.
Refs #136
cc/ @jonhoo @seanmonstar
Service
is a push based API. Requests are pushed into the service. This makes it vulnerable to situations where the request producer generates requests faster than the service is able to process them.
To mitigate this, Service
provides a back pressure strategy based on the Service::poll_ready
. Before pushing a new request into the service, the producer must call Service::poll_ready
. The service is then able to inform the producer if it is ready to handle the new request. If it is not ready, the service will notify the producer when it becomes ready using the task notification system.
The Service::call
function returns immediately with a future of the response. The producer may call the service repeatedly with new requests before previously returned futures complete. This allows a single service value to concurrently process requests.
This behavior complicates being able to sequentially call services. Consider the and_then
combinator:
let combined = service1.and_then(service2);
combined.ready()
.and_then(|combined| combined.call(my_request));
This will result in my_request
being pushed into service1
, and when the returned future completes, the response is pushed into service2
. Because the response from service1
completes asynchronously, the combined response future must contain a handle to service2
. Because the Service
trait requires &mut self
in order to use, service2
cannot be stored in an Arc
and cloned into the shared response future.
The strategy to handle this is to require service2
to implement Clone
and let each service implementation manage how it will handle dealing with concurrency. Concurrency can be added to any service by adding a layer of message passing. A task is spawned to drive the service itself and a channel is used to buffer queued requests. The channel sender implements Service
. This pattern is provided by buffer
.
The question at hand is how to combine the back pressure API (poll_ready
) with the pattern of cloning service handles to handle concurrency.
The first option is to use the same strategy as channels for back pressure. In short, each Sender
handle has a dedicated buffer of 1 slot (see here for a detailed discussion). Given that it is permitted to clone Service
handles once per request, applying this behavior to Service
would effectively result in unbounded buffering.
Instead, Service::poll_ready
should use a reservation strategy. Calling Service::poll_ready
results in reserving the capacity for the producer to send one request. Once poll_ready
returns Ready
, the next invocation of call
will not result in an out of capacity error.
This strategy also means that it is possible for a service's capacity to be depleted without any requests being in-flight yet. Consider buffer
with a capacity of 1. A service calls poll_ready
before generating the request. In order guarantee that that capacity is available when the request has been produced, the service must reserve a slot. The service then has no remaining capacity but the request has yet to be produced.
and_then
combinatorIn the case of the and_then
combinator, poll_ready
is forwarded to both services. For the combined service to be ready, capacity must be reserved in both the first and the second service. This means that, if the response future for the first service takes significant time to complete, the second service could be starved. This can be mitigated somewhat by adding additional buffering to the second service.
ref:
Line 24 in b95c8d1
Some RPC protocols (e.g., JSON-RPC, msgpack-RPC) support sending notifications, which are requests (without an ID) that in turn do not have an associated response. Is tower equipped to handle this scenario?
The signature of Service::call
should be revisited with the context of backpressure. The original trade offs don't necessarily make sense anymore.
Thruster looks like a more fleshed out tokio-minihttp.
It's turning out to be an anti-pattern to put constructor functions that return tuples on structs.
I'm looking to implement tokio-tower::pipeline::Client
, and as part of that I need to be able to "drive" the Service
implementation forward to send pending requests, publish responses etc. Currently, the only way to do that is to do all the work in poll_ready
. While that works just fine, it feels like a bit of an abuse of the API. @carllerche mentioned here that we might want a Service::drive
method where we do this kind of heavy-lifting.
Is the error permanent or can calling poll_ready
again succeed.
I just wanted to write up some thoughts on tower::Service and async methods
Here is the summary:
fn call
could ever become async fn call
, because lifetime capture is not how async fn
works.fn call
is right in the sweet spot where we could move on making -> impl Future
work for it soon, allowing you to use an async
block inside of it.The rest of this issue documents all of these issues in detail.
fn call
and async fn lifetimesAn async fn
captures all lifetimes in its inputs in the future it returns - this is the result of the fact that the entire function body is captured in the future it returns. That is, async fn call
would have to capture the lifetime of &mut self
, having a signature more like this:
pub trait Service {
type Request;
type Response;
type Error;
type Future<'a>: Future<Output = Result<Self::Response, Self::Error>> + 'a;
fn call<'a>(&'a mut self, req: Self::Request) -> Self::Future<'a>;
}
An astute observer will notice that this requires type Future<'a>
: that is a generic associated type, which you may be aware is a feature that does not work today but that we are working on changing the compiler to make it work. This is the big blocker for allowing async methods today.
What this means is that the service trait will never have an async fn call
in it. What you will be able to do someday, hopefully, is use an async
block inside call, to construct the future it returns.
There's a big problem with using async blocks in Service, though: an async block evaluates to a unique anonymous type, just like a closure would. This means you cannot write what the associated type Self::Future
is, e.g.:
impl Service for Something {
type Future = ???; // impossible to name
fn call(&self, req: Self::Request) -> Self::Future {
async { ... }
}
}
There are two solutions to this problem:
impl Trait
- remove the Future associated type, and change call to return -> impl Future
Neither of these are enabled today, because we don't allow impl Future
in trait methods. However, for the same reason that call cannot be an async fn
, there is no technical blocker on allowing it in this case: we could allow impl Trait
in traits when they are analogous to normal associated types, as long as they wouldn't be generic associated types (basically: no generic parameters on the method and no + 'a
in the return type).
The final thrust of all this is that right now we could implement a very conservative, very minimal version of impl Trait in traits, and it would happen to support changing Service
to look like this:
pub trait Service<Request> {
type Response;
type Error;
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
fn call(&mut self, req: Request) -> impl Future<Item = Self::Response, Error = Self::Error>;
fn ready(self) -> Ready<Self>
where
Self: Sized,
{ ... }
}
This would allow users to implement call with a Futures03Compat(async { })
expression, using async/await syntax, or else with combinators (which also usually involves anonymous closure types).
The question for y'all then, would you be interested in experimenting with this (probably just on git master, not released to crates.io)? I know Buoyant is using tower a lot internally, it would be great to get more experience from y'all using async/await syntax. If you are, we could possibly prioritize getting a conservative impl trait in traits enabled.
Respond
didn't pan out as a name, in part because the request counter part is ambiguous. Is Request
a verb or noun?
specifically the link to the 'crate documentation'.
Currently, Service
represents the request as an associated type. This was originally done to mirror Sink
in the futures
crate (relevant issue). However, requests are an "input" to Service
which can make representing some things a bit tricky.
For example, HTTP services accept requests in the form of http::Request<T: BufStream>
. In order to implement a service that handles these requests, the service type must have a phantom generic:
struct MyService<RequestBody> {
// ...
_phantom: PhantomData<RequestBody>,
}
impl<RequestBody> Service for MyService<RequestBody> {
type Request = http::Request<RequestBody>;
}
This, in turn, can cause type inference issues as the request body must be picked when MyService
is initialized.
The Service
trait would become:
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Item = Self::Response, Error = Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
fn call(&mut self, req: Request) -> Self::Future;
}
This moves the request type to a generic.
The full impact of this change is unknown. My best guess is it will be mostly ergonomic. For example:
Before:
fn foo<S: Service>(service: S) { }
After:
fn foo<R, S: Service<R>>(service: S) { }
And it linearly gets worse on the number of generic services:
Before:
fn foo<S1, S2>(s1: S1, s2: S2)
where
S1: Service,
S2: Service<Response = S1::Response>,
{ }
After:
fn foo<S1, S2, R1, R2>(s1: S1, s2: S2)
where
S1: Service<R1>,
S2: Service<R2, Response = S1::Response>,
{ }
today there already are plenty of cases in which the one has to add additional generic parameters. In order to bound an HTTP service today, the following is required:
fn foo<S, B>(service: S)
where
S: Service<Request = http::Request<B>>,
B: BufStream,
{ }
In practice, this turns out to be quite painful and it gets unmanageable quickly. To combat this, "alias" traits are created. It is unclear how this change would impact the ability to use this trick.
In order to evaluate the impact of the proposed change, the change has been applied to Tower, and a number of projects that use Tower.
impl<Request, Response, Error, FutureResult, F> Service<Request> for F
where
FutureResult: Future<Item = Response, Error = Error>,
F: Fn(Request) -> FutureResult,
{
type Response = Response;
type Error = Error;
type Future = FutureResult;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, request: Request) -> Self::Future {
self(request)
}
}
I try to implement the service trait for the function type (Fn(xxx) -> yyy) using above code in tower-service
crate. However, it has some conflict with &mut S
and Box<S>
implementation.
error[E0119]: conflicting implementations of trait `Service<_>` for type `&mut _`:
--> tower-service/src/lib.rs:328:1
|
294 | / impl<'a, S, Request> Service<Request> for &'a mut S
295 | | where
296 | | S: Service<Request> + 'a,
297 | | {
... |
308 | | }
309 | | }
| |_- first implementation here
...
328 | / impl<Request, Response, Error, FutureResult, F> Service<Request> for F
329 | | where
330 | | FutureResult: Future<Item = Response, Error = Error>,
331 | | F: Fn(Request) -> FutureResult,
... |
343 | | }
344 | | }
| |_^ conflicting implementation for `&mut _`
|
= note: downstream crates may implement trait `std::ops::Fn<(_,)>` for type `&mut _`
error: aborting due to previous error
For more information about this error, try `rustc --explain E0119`.
error: Could not compile `tower-service`.
To learn more, run the command again with --verbose.
Does anyone have some suggestions to correct this?
A placeholder for figuring out how to correctly handle back pressure w/ Service
. I have attached notes I wrote a while ago, but my thoughts have evolved since. I haven't written them down yet.
In this strategy, there is no back pressure strategy built into a service implementation. Each service implementation always accepts a request and starts processing the request, queuing if necessary when upstream resources are not available.
The assumption is that, the caller of service (tokio-proto) will maintain a maximum number of outstanding futures for a given connection. For example, a common number for an HTTP/2.0 implementation is 100 in-flight requests.
The problem here is that this strategy only really works if each connection is independent. If all connections require access to a global resource, say a global queue that has the buffer set to 1,000 and there are 100k open connections, the number of outstanding requests will start to backup heavily. In this case, 10 open connections are able to fill the global queue, causing 999,990 other connections to effectively be able to buffer 100 requests each.
The ideal situation here is that, as the global queue becomes full, tokio-proto stops accepting new connections.
This strategy modifies Service::call
to align more with the Sink
API. If a service is not ready to accept new connections, a call would return immediately with AsyncService::NotReady(request)
. When the service becomes ready, the task is notified and the request can be attempted again. When tokio-proto and other callers of Service
receive an AsyncService::NotReady(req) the common response will be to buffer the returned request until the service becomes ready again and to not generate any further requests until that buffer slot is cleared. In rarer cases, it may be possible to fail over to a secondary service.
This strategy creates significant complexity for all implementations of Service. First, it exposes AsyncService and requires understanding that concept, then every middleware needs to be able to return the original request if the upstream is not ready.
BEFORE:
impl<T, P> Service for ClientService<T, P> where T: 'static, P: ClientProto<T> {
type Request = P::Request;
type Response = P::Response;
type Error = P::Error;
type Future = ClientFuture<T, P>;
fn call(&mut self, req: P::Request) -> Self::Future {
ClientFuture {
inner: self.inner.call(Message::WithoutBody(req))
}
}
}
AFTER
impl<T, P> Service for ClientService<T, P> where T: 'static, P: ClientProto<T> {
type Request = P::Request;
type Response = P::Response;
type Error = P::Error;
type Future = ClientFuture<T, P>;
fn call(&mut self, req: P::Request) -> AsyncService<Self::Future, P::Request> {
match self.inner.call(Message::WithoutBody(req)) {
AsyncService::Ready(f) => {
AsyncService::Ready(ClientFuture {
inner: f,
})
},
AsyncService::NotReady(req) => {
match req {
Message::WithoutBody(req) => AsyncService::NotReady(req),
_ => panic!("wat"),
}
}
}
}
}
If the middleware mutates the request in such a way that is not reversible and the upstream returns AsyncService::NotReady, then the middleware has no choice but to error the request, which kind of defeats the purpose.
Also, a service must be able to determine if the request can be processed immediately, which can cause additional complexity in the case where a service implementation does something like:
upstream_a.call(request)
.and_then(|resp_a| upstream_b.call(resp_a));
It is unclear how to handle upstream_b.call returning AsyncService::NotReady since the original call has already returned. The only solution that I can think of is to buffer resp_a
and set a flag on the middleware to not accept any more requests.
AsyncService does not handle the โrouterโ problem where one route may be ready but another one is not. If a request sent in results in AsyncService::NotReady, the caller will either have to stop all further requests or buffer the requests that are rejected while sending in new requests. This โpendingโ buffer could be large. Also, for every โtickโ of the event loop, the service will need to attempt to flush all buffered requests even if only a single one may be ready (itโs also unclear how the service flushes the buffered requests since there is no โpokeโ). If a buffer is introduced, it is unclear why the router doesnโt manage the buffer itself w/ the poll_ready
strategy. It would be much simpler.
This seems to imply that the right thing to do when a service is overloaded is to resolve the response future as an error. The problem is then, how does the caller know when the service will accept another request?
In this strategy, Service
has an additional function: poll_ready()
. The function returns Async::Ready when the service is ready to accept another request. The caller then sends the request, which returns a response future as it โknowsโ that the service can accept the request.
Questions would be, is poll_ready() a guarantee or can it return false positives? If it does return a false positive, how does a service respond to a call when there is no availability? I would suggest that this can be left up to the service implementation (maybe it is a configuration setting) and maybe we provide some best practice hints? There really would be two options, either the service accepts the request, buffers it, and returns a response future or the service returns an error future. Both seem like they could be acceptable depending on the situation. Note, that if the service returns an โout of capacityโ error, the poll_ready function should provide help to the caller to determine when to resume sending requests.
Another question is how to handle services that are conditionally ready depending on the details of the request. For example, a router may have one route that is ready and another one that isnโt. The way this would work with poll_ready would be that the router would be configured with a max buffer size per route. When a routeโs buffer is exceeded, the router can either disable the entire router (poll_ready returns not ready) or it can error further requests on the route and keep the router service โreadyโ.
I believe that this strategy is simpler and provides the same capabilities as option 2.
Service specific back pressure strategies could be employed as well. For example, in the router case, when a route is not ready, the router could error the request but provide back pressure information in the error:
struct RouteUnavailable<R> {
request: R,
route: String,
ready: impl Future<()>,
}
In this case, the error includes the route that was unavailable as well as a future representing that specific route becoming available again. The application can then handle the back pressure signal or just ignore the error and the request is aborted.
Another strategy could be having a โstateโ on the response future:
enum ResponseState {
Healthy,
Distressed,
}
Services could then always accept all requests, but if there is a back pressure situation going on, the response future will be in the โdistressedโ state. The service caller could then decide if it wants to abort the request (drop the response future) or buffer itโฆ
It seems like these structs would be the same.
#76 defines WithPeakEwma
which takes a T: Discover
and implements discover. However, the struct would be exactly the same if it implemented NewService
.
I am wondering if the pattern should be to define NewPeakEwma<T>
and to define both NewService
when T: NewService
and Discover
when T: Discover
.
poll_ready
does the check, but call
currently assumes poll_ready
is always called before hand.
The ReadyService
trait is new and less proven than Service
, and we don't want to experiment and realize it needs changes that require a breaking change release for the base tower
crate.
Instead of having a current_message
slot, perhaps the worker should "join" on the service and the request queue being ready.
Thoughts @jonhoo
Due to Buffer
using a futures::sync::mpsc
channel, any ResponseFutures
that have been dropped will continue to consume space in the queue until the underlying service has progressed through the requests in front of them. A Buffer
could be wrapped in Timeout
, which could cancel the requests if waiting took too long. The oneshot::Sender
still being somewhere in the queue means the buffer's capacity could become full of canceled requests.
There's the additional issue that a wrapped Reconnect
may wish to only retry a failed connect if there are still response futures waiting, but being in the queue makes it impossible to determine that.
This is kind of the "other" half of a pool. hyper does have a queue of waiters internally, and can check when they are canceled, since they are actually in a VecDeque
. To allow new requests to enter this queue, it's wrapped in an Arc<Mutex>
. While perhaps not the best thing in the world, it does work, and people still get excellent performance from hyper's client, so we could consider that as a first pass.
Related Conduit issue: linkerd/linkerd2#899
I'm aware #5 just landed, and I should have brought this up during that review, but hopefully you will indulge me.
Generally, I'm in favor of reducing the number of associated types wherever possible. In my experience over-use of associated types makes everything more difficult, from implementing and working with the trait in code, to reading the rustdoc for the trait and its implementations.
I propose changing the current definition of NewService
, which contains 6 associated types, to:
/// Creates new `Service` values.
pub trait NewService {
/// The `Service` type created by this factory.
type Service: Service;
/// Errors produced while building a service.
type Error;
/// The future of the `Service` instance.
type Future: Future<Item = Self::Service, Error = Self::Error>;
/// Create and return a new service value asynchronously.
fn new_service(&self) -> Self::Future;
}
I don't think this exact permutation was suggested in #5, but there was some discussion between @carllerche and @withoutboats about associated types of associated types being ambiguous (e.g. MyNewService::Service::Request
is currently invalid).
In my estimation, reducing the number of associated types will make it easier for beginner and intermediate users of tower
, up to an including people implementing NewService
for their own types. If I understand correctly, the place that the associated types of types issue bites is when writing code which is generic over NewService
and needs to reference a type from the associated Service
. Basically, I'm positing that whoever is writing this is an advanced user, and extra <MyNewService::Service as Service>::Request
syntax isn't That Bad(tm).
Buffer
requires an executor to spawn tasks. Currently, constructing a buffer requires an explicit executor to be passed in. Instead, Buffer::new
should default to DefaultExecutor
.
Though, currently Buffer::new
spawns the task eagerly in new
, but often times the Buffer
instance is created from outside of the runtime before, in which case DefaultExecutor
won't work.
There would be a new constructor fn Buffer::new_with_executor
(though, the name might be improved) that takes an executor.
Buffer
should also be updated to avoid exposing the concrete task type that needs to be spawned. We can opt for the strategy described here.
The Service
task is never notified when the service becomes ready (number of in-flight requests drop below the max).
This has been requested in the past by @olix0r.
Doing so would allow providing additional context. Given that NewService
is most often called to process a new connection, the argument could be used to pass connection level data for the service to consume.
Ideas:
Do we have an example of Unix domain socket support?
We have built up a number of patterns that were developed from our usage of Tower. We should document these.
This is a meta issue to track the patterns that should get documented.
Service + Clone
to allow moving an inner service into a response future.Service
for Arc<MyType>
to get around &mut
requirement.cc/ @seanmonstar, @olix0r, @hawkw: I'm sure I'm forgetting patterns here. Thoughts?
Does it make sense to provide an abstraction for the other side of the NewService handshake - the trait implemented by various protocol server implementations?
Something like this?
trait NewServer {
type Request;
type Response;
type Error;
type Server;
fn bind<S>(&self, new_service: S) -> Result<Self::Server, Self::Error>
where S: NewService<Request = Self::Request, Response = Self::Response, Error = Self::Error>;
}
This question was raised by @withoutboats for Service
. I suggested that there are cases in which having a Service
impl that is not clone is desirable. However, I don't think that those reasons apply to ReadyService
.
Having ReadyService
extend Clone
should be explored.
The basic idea is that Service
implementations may have a Response
value that contains references, rather than owned data. #99 would make it possible to have the type Response
depend on lifetimes in the Request
type parameter, and so would allow Response
to reference the Request
object, which gets us part of the way. However, there still wouldn't be a way to reference items created inside the Future
returned from call
, and if the Request
was owned (no lifetimes) there wouldn't be a way to return references into it, only owned objects that contain bits from it. If call
wants to do e.g. zero-copy parsing using serde/nom and then return the parsed object/AST (which references the Request
buffer), this still wouldn't be possible.
One solution that I've been using in Fuchsia is to sort of CPS-transform the request-response pattern, allowing the Service
implementation to call into whatever is receiving the Response
. Applied to the Service
trait (and using GATs), this could look something like this:
trait Service<Request, Responder: for<'a> Sink<Self::Response<'a>>> {
type Response<'a>;
type Error;
type Future: Future<Item = (), Error = Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
fn call(&mut self, req: Request, res: Responder) -> Self::Future;
}
// Example impl:
impl<Buf, Responder> Service<Buf, Responder>, for BorrowedJsonTyDeserializer
where
Buf: AsRef<[u8]>
Responder: for<'a> Sink<Self::Response<'a>>
{
type Response<'a> = BorrowedJsonTy<'a>;
type Error = io::Error;
exists type Future: Future<Item = (), Error = Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { Poll::Ready(()) }
fn call(&mut self, req: Buf, res: Responder) -> Self::Future {
async move {
await!(res.send(BorrowedJsonTy::from_slice(req.as_ref())?)?)
}
}
}
Obviously this is pretty fuzzy and relies on a feature (GATs) that isn't implemented yet, but I think it gets across the general idea of what I'm trying to achieve.
Another related transformation that we do in Fuchsia is to model most request/response patterns as Stream
s that yield a (req, response_channel)
pair, which makes explicit backpressure like poll_ready
unnecessary, as the code processing the stream gets to choose how often to pull items off. If you wish to process up to 100 elements at a time, you can do something like request_stream.for_each_concurrent(|(req, res)| ...res.send(...) ... )
. This pattern works out really well for us and is often more ergonomic than trying to make a trait like Service
fit into the equation. The res.send(...)
methods we use frequently take references (including trait objects) which isn't possible with the current Service
trait.
Please let me know if y'all have other ideas about how to allow these types of borrowed messages. I'm hopeful that together we can come up with a way to allow ergonomic borrowed response objects like this.
P.S. One thing that comes up often is that the res
objects all want to write into a shared resource, e.g. a TcpSocket
. This works out naturally in the "return a Response
object" formulation because the thing calling Service::call
can choose when to push objects into the socket in series, but in the CPS formulation is a bit trickier since you have to notify each writer in turn when the socket is readable and it's that particular writer's turn to complete. This requires some more clever wakeup logic that would be nice to get rid of ;)
It's not critical to always specify this.
Heya, I was looking at the docs, and at the top it points to https://tower.rs/ for more information.
The site itself does self-signed HTTPS it seems, and the unsafe http version doesn't seem particularly related to this project.
I'm not sure what the status of the domain is, but perhaps it might be useful to remove the reference to the site until a change has been made?
Thanks for your time; excited for how the project is shaping up! โจ
Currently, the tower-reconnect
middleware tries to reconnect immediately if an error occurs while connected, but propagates all errors if an error occurs while connecting. We should modify this middleware to:
Context: #6. The question: should Service
have Clone
as a supertrait?
@carllerche:
Clone
can be added as a decorator around a service IMO that provides some level of configurable buffering.
Does this mean you have an idea for a wrapper which takes any Service
type and produces a Service + Clone
type? If this works it at least alleviates my concern; users won't be totally out of luck if they need to clone a Service, they just need to apply this wrapper.
I don't see how it would be unusable? It's very usable in the case of sequential use. You can only have one in-flight request at the time.
So the model that I'm concerned about is this:
struct Endpoint {
redis: RedisService,
postgres: PostgresService,
}
impl Endpoint {
fn call(&mut self) -> impl Future<Item = Response, Error = Error> {
self.redis.call(redis_request).and_then(|redis_response| {
self.postgres.call(postgres_request);
...
}
}
}
// or someday
impl Foo {
async fn call(&mut self) -> Result<Response, Error> {
let redis_response = await!(self.redis.call(redis_request))?;
await!(self.postgres.call(postgres_request))?;
...
}
}
This results in ownership errors because the future needs to own the postgres handle. If the postgres handle isn't Clone
, users simply can't write this code.
If all they need to do is apply some wrapper around the handle, that's not as serious. But even then I suspect tower is going to get a similar reputation issue that tokio has had, so I'm wondering how much the performance impact of requiring every Service to build in its Clone
aspect would actually be.
https://github.com/carllerche/tower/blob/d5f253c577a98210f4bda336677da95dd91142b0/src/lib.rs#L176
I'm not sure what the original reasoning was, probably to have less generics. That said, it would probably be worth revisiting.
Are there plans to support graceful termination of services? E.g. in Finagle Close() method returning Future is a core part of the Service class.
Most of the various middlewares in tower define an enum Error<E> { Inner(E), .. }
to return from Service
. A problem starts to arise once you compose multiple middlewares together, but still want to be somewhat generic over handling the errors. Look at this gem:
error: Inner(Inner(Inner(B(Service(Inner(A(B(Error { kind: Connect, details: "interesting" }))))))))
In this case, we've wrapped a hyper::Client
in a bunch of middleware. In order to inspect and react to the error (in my case, decide if the error qualifies to retry the request), I could:
match e { UnNestAllTheLayers => () }
. This is very fragile, since adding new middleware, or changing the order, will cause compilation to fail. It's also hugely tedious being sure we've match every correct combination.trait CanRetry
and implement it for every single intermediary type. Adding a new middleware means we need to implement CanRetry
for the potentially new nested error enum.A different solution might be better: remove the enum Error<E>
, and instead having all middleware just return Box<dyn std::error::Error + Send + Sync>
.
The code needed to inspect errors if they were dynamic trait objects changes to something like this:
if let Some(e) = error.downcast_ref::<hyper::Error>() {
if e.is_connect() { // or whatever
return CanRetry::Yes;
}
}
Consider tower-in-flight-limit
:
impl<S, Req> Service<Req> for InFlightLimit<S>
where
S: Service<Req>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Response = S::Response;
type Error = Box<dyn StdError + Send + Sync>;
// ...
}
impl<T> Future for ResponseFuture<T>
where
T: Future,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Item = T::Item;
type Error = Box<dyn StdError + Send + Sync>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
Some(ref mut f) => {
match f.poll() {
// ...
Err(e) => {
self.shared.release();
Err(e.into())
}
}
}
None => Err(NoCapacity.into()),
}
}
}
This change means that if an error occurs, it will be a Box<dyn StdError + Send + Sync>
, but it won't be nested (unless nesting with Error::cause()
(now in 1.30, Error::source()
!)), and so it should only be either S::Error
or NoCapacity
.
By requiring the bounds of S::Error: Into<Box<dyn StdError + Send + Sync>>
, this allows only boxing the error once. If multiple middleware are wrapped up, and where they would previously return Error::Inner(e)
, they'll just return the Box<dyn StdError>
without wrapping.
A couple downsides exist with this change:
Send
and Sync
being propagated through the generics, so we would need to require S;:Error: Send + Sync
. In practice, it seems reasonable that errors would be Send + Sync
anyways.enum
, it didn't. The error cause should be infrequent, so the allocations shouldn't happen too often. However, this actually can mean an performance improvement, since Result<T, Box<Whatever>>
can sometimes remove the need for the tag in Result
, if T
is zero-sized. Additionally, nested enums grow in memory, so having a single Box
can means the Result
is smaller regardless.enum Error
can be nice to ensure you handle every error case, and this proposal would force you to do downcast even in those simple cases.Currently, mapping the error type of a response future is pretty painful as you have to create a ton of new types just to wrap it... This could be handled by a map_err
combinator.
Unfortunately, using this would really require rust-lang/rust#44490.
For idempotent services, it can be useful to issue speculative requests to alternate replicas when the original request is slow in completing. This can smooth over latency outliers at the cost of wasted execution. For an example from a real world system, see the HDFS 'hedged reads' feature. This relates to #14, since you typically want to design the retry and speculative execution policies in tandem.
Currently, tower_buffer::Buffer::new
returns a SpawnError<T>
, defined like so:
pub struct SpawnError<T> {
inner: T,
}
The T
here is the Service
passed to Buffer::new
. However, there is no way to extract the inner
from SpawnError
, so Buffer::new
's caller has no way to recover the transport on failure.
This is a meta issue tracking concepts that should be included as part of the documentation.
We can check the status of a Service
with poll_ready
, however, that has only 3 states:
A Service
may have closed cleanly, in which case none of those 3 states are correct. The current best choice is to return an error, and hope the error message is sufficient to say it was a clean close. This problem is similar to how before Rust 1.0, a Read
closing would return an Err
of Eof
. Before 1.0, it was changed such that an end-of-file isn't an error, but instead reported as Ok(0)
.
Some examples of closing cleanly:
We could similarly make closing a clean Ok
status, instead of an error:
// Proposed change
enum Status {
Open,
Closed,
}
fn poll_ready(&mut self) -> Poll<Status, Self::Error> {
}
As a side note, it'd be really useful at times to be able to check is_ready
and is_closed
, without being in a task context. For instance, a wrapped Service
that returns it to some pool on Drop
could want to check if the underlying Service
is closed. Other cases are to allow for call
to check if an underlying service is ready even if poll_ready
wasn't called previously.
The DirectService
trait (introduced in #118) is an alternate version of Service
that is being experimented with. It provides the necessary hooks to "drive" a service's internal state machine forward. This allows avoiding to have to spawn new tasks to power service implementations.
We should consider whether or not this trait should be promoted to be the primary Service
trait.
When introducing DirectService
, we extensively discussed whether or not the functionality should just be provided by Service
. We concluded that it should not. The primary reason is that middleware is not always able to drive the inner service. For example, a router cannot effectively call poll_service
as needed. Because of this, there are service implementations that driven (require poll_service
to be called) and there are service implementations that are buffered (requests get dispatched on a channel to another task). These services do not require poll_service
to be called. Initially, we could not figure out a good way to implement middleware that could indicate which kind of service it wanted, so we opted for two completely separate traits.
It occurred to me that we could combine DirectService
with Service
and use Clone
as an indicator of whether or not the service is driven vs. buffered. The router must already accept T: Service + Clone
. The strategy taken is, when a request comes in, the request is routed to an inner T: Service
and that T
is cloned into the router's response future. T::poll_ready
is called by the response future. Services that are unable to call poll_ready
will all most likely take a T: Service + Clone
.
So, the proposal is to make Service
what DirectService
is today. Service
implementations like Buffer
would have no-op poll_service
and poll_close
implementations.
The other con is that implementing Service
becomes more complicated. This is a real drawback, but I believe it can be mitigated by providing utilities for common patterns:
service_fn
for easily defining leaf services.For example, basic logging could be implemented as:
service_fn(|request| process(request))
.before(|request| println!("request = {:?}", request))
.after(|response| println!("response = {:?}", response))
And more complicated:
service_fn(|request| ...)
.around(|request, next| {
let request = request.clone();
next.call(request)
.inspect(move |response| {
println!("request = {:?}; response={:?}", request, response);
})
})
The promotion could be done in a few steps.
DirectService
-> Service
but keep it in the tower-direct-service
crate.tower_direct_service::Service
instead of tower_service::Service
tower_service::Service
with tower_direct_service::Service
and release tower-service
0.3.Refs #137
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.