Giter Site home page Giter Site logo

tokio-io's Introduction

The tokio-io crate has been moved into the tokio Git repository.

Do not use this repo anymore.

tokio-io

Core I/O abstractions for the Tokio stack.

Build Status

Documentation

Usage

First, add this to your Cargo.toml:

[dependencies]
tokio-io = "0.1"

Next, add this to your crate:

extern crate tokio_io;

You can find extensive documentation and examples about how to use this crate online at https://tokio.rs. The API documentation is also a great place to get started for the nitty-gritty.

License

This project is licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Tokio by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

tokio-io's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tokio-io's Issues

length_delimited should allow using a custom Codec

I can't find a way to use a custom Codec in conjuction with length_delimited. I'm currently working on a PR to implement varint support for length_delimeted (an untested work in progress can be found here). The usecase is to use protobuf with a varint length delimiter. With length_delimited::Framed only implementing Stream and Sink, but not AsyncRead and AsyncWrite, it's not possible to chain a Codec with AsyncRead::framed on it in order to deserialize the returned BytesMut / serialize items into BytesMut before length-delimiting them.

My suggestion would be to either implement AsyncRead and AsyncWrite on Framed, and respectively on FramedRead and FramedWrite.
Another idea is to be able to add a custom Codec to Builder, which will be invoked after a full frame has been read into BytesMut for deserialization, and for serialization in order to create the BytesMut which needs to be length-delimited.

FramedRead updates unusably slowly

I'm attempting to parse the output of a child process with a FramedRead and a simple line codec using tokio-process. However, my decode is only ever called every 8192 bytes, which could range from seconds to hours. FramedRead should attempt to decode every time the pipe tokio-process sets up is written to.

I notice INITIAL_CAPACITY is hard-coded to 8192. Is this the problem?

Encoder::encode() should take an Item reference

In Encoder::encode(), an Item gets encoded into a BytesMut. However, the Item is currently moved into the encode() call, where a non-mutable reference would seem to make more sense. Since the encode() method argument's type is directly tied to the trait type, the lifetime bound would somehow have to be related to the trait or the implementing struct, which I don't think makes sense in this case: the lifetime should only be bound by the duration of the method call.

Does this make sense? It seems like it would clash with #26, if I'm not mistaken?

Decoder::decode is slow for large buffers

I am new to async I/O, Rust & tokio. So forgive me if I diagnosed the problem incorrectly.
My frame contains of header & data. The header tells the length of the data. The data in my case tends to be large ... say 1MB+.

As I understand Decoder::decode is supposed to return None when enough bytes are not present to complete the frame. The header gets parsed quickly for but large content which follows the header, the decoder keeps on returning None for a long time and things slow down considerably.

Is there a way for decoder to give a hint to FramedRead to say how much data the decoder is expecting? If yes, then the FramedRead would read that much amount of bytes exactly and then call Decoder::decode

Consider adding `io::read_repeating`

The current BytesCodec implementation is convenient, but as discussed it requires allocating a BytesMut for the data. There should be an easy way to get a stream of bytes out of an AsyncRead without allocating.

One option is to use io::read and stream::unfold:

let buf = [0; 1024];
struct LoopState<R, B> { is_first: bool, read: R, buf: B, num_read: usize }
let init = LoopState { is_first: true, read, buf, num_read: 0 };
let value_stream = stream::unfold(init, |mut state| {
    if num_read == 0 && !state.is_first { return None; }
    io::read(state.read, state.buf).map(|(read, buf, num_read)| {
        let output = process_buf(buf);
        (output, LoopState { is_first: false, read, num_read })
    })
});

This is pretty unergonomic, though. Consider adding a function like this:

pub fn read_repeating<R, B, F, Fut, It, E>(read: R, buf: B, f: F) -> RepeatingRead<R, B, F, Fut, It>
where
R: AsyncRead,
B: AsMut<u8>,
F: FnMut(&[u8]) -> Option<Fut>,
Fut: Future<Item=It, Error=E>,
E: From<io::Error>,

where RepeatingRead<R, B, F, Fut, It> implements Stream<Item = It, Error = E>.

Using it would look like this:

let value_stream = io::repeating_read(read, [0; 1024], |bytes| process_buf(bytes));

This could potentially help with tokio-rs/tokio-core#276.

However, it also feels like this overlaps quite a bit with the existing encoder/decoder functionality. It seems like the main difference is passing in a reference to a user-provided buffer instead of yielding a BytesMut. Comparatively, read_repeating is limiting to the end-user because they must either be able to yield an item from any incoming chunk of data, or they must save intermediate state in their closure upvars. In return, read_repeating reduces the number of allocations.

BytesMut lacks a direct implementation of std::io::Write

I'm currently using the codec module to implement a framed tansport.

Before the refactoring, the encode-method of the old Codec trait received a Vec<u8> as parameter to serialize the message into. Vec<u8> directly implements std::io::Write which makes it very easy to put any kind of data into it, since most implementations have some way of writing into an instance of Write.

Now the Vec<u8> has been replaced by BytesMut, which lacks a direct implementation of that trait. Write-compatibility is only archievable through into_writer, which consumes the BytesMut. Consuming is not possible, though, because the method only receives the BytesMut by &mut.

Is this desired behavior?

Reconsider including `length_delimited` in the tokio-io crate

It could easily live in a dedicated crate, which would also allow it to iterate on API breaking changes faster. For example, #71 has been left unaddressed as it requires breaking the API.

For discoverability, the tokio-io docs can reference any additional crates that provide higher level functionality.

Question: How to combine different layers of a protocol?

I hope this is the right place to ask?
I wonder where to place different kinds of layers of a protocol and where to process statefull operations like adding headers with transaction IDs or checking checksums.
Let's say there are two layers: the transport layer and the high level protocol. The Frames are nested like this:

  • transport A: [HEADER_A][PDU][CHECKSUM_A]
  • transport B: [HEADER_B][PDU]
  • transport C: [PDU]
  • PDU protocol A: [PDU_HEADER,DATA]
  • PDU protocol B: [ID,META_DATA,DATA]

How would you structure the implementation? What kind of work should be done by a codec and what should be done in higher level instances like a ClientProto? And how to create "nested codecs", where e.g. PduCodecA is used by a TransportBCodec?

Async shutdown for IO

Some things would previously have implemented Io have need to perform read/write ops before closing the socket. An example is TlsStream, such as provided by tokio-tls. For higher level abstractions, like making a Transport over Io (now AsyncRead + AsyncWrite), the knowledge of if a shutdown is necessary isn't available. This probably should be implemented on the stream (TlsStream), on a trait, and the wrapping Transport should have some trait method to call.

This could have a default implementation of just returning futures::ok(), and then specific streams can implement how they need. I can't easily say if it should be on AsyncWrite, since something may require both reading and writing in order to communicate a clean shutdown.

Potential bugs in codec::FramedRead

I'm a bit concerned about potential bugs introduced when porting FramedRead to use bytes::BytesMut.

BytesMut does no implicit allocation. Once the capacity of the buffer has been reached, it can no longer accept more data. This means that it could be possible for this call here to be passed a zero length buffer:

https://github.com/alexcrichton/tokio-io/blob/463e0365a556e1e12508d3b5ff811fc78b6d9750/src/framed_read.rs#L217

That would be interpreted as an EOF.

In general, how to deal w/ growing the buffer should be thought about. It's possible that growing the buffer should be left to the Decoder implementation, but may be unexpected...

EINTR handling

I wanted to bring EINTR (ErrorKind::Interrupted) handling up as a general topic.

Traditionally, EINTR on Unix systems has meant that a signal has interrupted this syscall. Usually, the operation can be immediately retried (unlike EAGAIN/EWOULDBLOCK/WouldBlock which would have to be retried later).

How should tokio-io handle EINTR, if at all? For example, this call in FramedWrite::poll_complete can fail with ErrorKind::Interrupted -- this means that a stream forwarding to a FramedWrite can fail with EINTR at this point. This means that a Forward future when run with Core::run will fail and consume the stream and sink, even though it can just be immediately retried.

  • Should FramedWrite retry on ErrorKind::Interrupted?
  • Should try_nb! retry on ErrorKind::Interrupted?

More general questions (perhaps more appropriate for other crates):

  • Should Forward futures have some way to return the stream and sink on error as well?
  • How should this general pattern (Core::run gets passed a future which errored out, but the future is still valid and poll can still be called on it) be handled?

Add AsyncBufRead

std::io::BufRead being a trait allows zero-copy reading on already on-memory buffers. I propose here to add the similar thing in tokio-io. Adapters should also be added, either separated, or specialized (I haven't think into the detail).

One usecase of the async version is that adding additional transform on half-decoded byte buffers. In hyper, the response body is decoded into chunk, then we're going to apply gzip decoding on it (planned to be implemented in reqwest).

This should benefit tokio-tls as well.

length_delimited::Framed length adjustment

It's a bit weird that it's possible to adjust length while decoding and not possible when encoding.
I'm working with a protocol which looks like this:

|~~~   Packet 1   ~~~|~~~   Packet 2   ~~~|~~~  Packet 3    ~~~|~~~

+-------+------------+-------+------------+-------+------------+...
| n=U32 | (n-4) * U8 | n=U32 | (n-4) * U8 | n=U32 | (n-4) * U8 |
+-------+------------+-------+------------+-------+------------+...

|~ Len ~|~   Data   ~|~ Len ~|~   Data   ~|~ Len ~|~   Data   ~|~

It doesn't seem possible to encode a correct packet with current state of things, is it?

Clarify behaviour of flush on WouldBlock error

Is an implementation of the AsyncWrite trait allowed to block on flushing?

The first sentence of the trait doc comment indicates that this should not be the case:

This trait inherits from io::Write and indicates that an I/O object is nonblocking, meaning that it will return an error instead of blocking when bytes cannot currently be written, but hasn't closed.

But the remainder of the documentation only explicitely talks about how write should handle WouldBlock errors, not about flush.

cc facebookarchive/rust-partial-io#17

Add a "Bytes" Codec Implementation

IIUC, AsyncRead and AsyncWrite pretty much rely on framed in order to be useful in futures-rs-style combinators. However, there's no provided implementation of Encoder and Decoder, so users always have to pull one in from another crate or write their own, even if they just want to read bytes directly from a stream. Consider exposing a Bytes codec similar to the one that already exists in the tokio-core examples.

Document that std write_all is incompatible with non-blocking IO

std::io::Write::write_all is incompatible with non-blocking IO, because it can do partial writes then fail, and the caller cannot find out how many bytes were actually written. This also applies to libraries that might call write_all internally -- they cannot be used with non-blocking IO.

It would be good to document this restriction somewhere. It might also be worth making AsyncWrite panic on write_all?

I've filed rust-lang/flate2-rs#92 and alexcrichton/bzip2-rs#17 for known users in a couple of crates.

FreeBSD clients able to freeze a Tokio+Hyper Linux server

I'm having a really nasty bug with Pijul clients on FreeBSD able to freeze my Tokio+Hyper server nest.pijul.com, by just issuing pijul clone https://nest.pijul.com/pijul_org/pijul.

That server has a fixed number of threads, each running one Tokio event loop, on which three servers are running:

  • A HTTPS server, which gets frozen when the user tries to clone.
  • A HTTP server, which still works fine even when the HTTPS server is frozen.
  • An SSH server, which also still works fine even when the HTTPS server is frozen.

Moreover, when this happens, the CPU load is normal.

After weeks of debugging, I've come to the conclusion that maybe one of Tokio and Hyper is guilty for that, and I'm not yet sure which. I can't reproduce in a virtual machine (I've tried VirtualBox), but I don't know which network stack gets used then.

Is it possible that because FreeBSD has a totally different networking stack from Linux', socket control works in different ways?

Changelog

Would you mind to add a CHANGELOG.md?

Does it make sense that tokio::codec::Decoder's decode method takes a BytesMut?

While trying to use prost with tokio's framed codec mechanism, I found I couldn't use its decode method because it expects an IntoBuf, which BytesMut isn't (I created an issue on prost about this: https://github.com/danburkert/prost/issues/33).

But it seems to me that it doesn't make a lot of sense that decode is being given a BytesMut to begin with. A decoder probably shouldn't be modifying the underlying bytes of the buffer. Wouldn't passing in Bytes or an IntoBuf make more sense here?

Add poll_read/poll_write and need_read/need_write

As described in tokio-rs/tokio-core#196 I need these methods to implement sendfile support. And yes, contrary to #6 it would be good to have them on trait object.

In the latter case, tk-sendfile don't have to depend on tokio-core and libraries that need sendfile support for their own streams (say a http library) can implement AsyncWrite + AsRawFd instead of depending on tk-sendfile itself to a custom trait (because of coherence rules).

Bidirectional copy?

I think it would be useful (e.g. for building proxies) to have bidirectional copy implemented. Of course, one could do split() on two ios, then call copy() for each direction and finally join() resulting futures. There are some shortcomings with this approach:

  • One needs to write the code explicitly. Apart from having to type more code it decreases readability (it's easier to understand bidirectional_copy(io1, io2) than about five lines of code, especially assuming bidirectional_copy is as well-known function as copy is)
  • split() allocates (Arc<BiLock<_>)
  • The operation involves locking and could cause slow down. Bidirectional copy could avoid it.

Of course, maybe bidirectional_copy isn't the best name. I don't care much about the name as long as it will be readable.

AsyncWrite wrappers that consume self can fail because of WouldBlock

See gyscos/zstd-rs#22 as an example. There's a method called finish() which consumes self but could potentially fail because of WouldBlock.

This can lead to writers being stuck in an unrecoverable state even though all they needed to do was retry on next poll().

Is there any way to handle this short of adding a non-consuming version of any methods that could consume self yet cause writes?

AsyncWrite::shutdown semantics are a little unclear

I was having a look at the docs for shutdown and they're a bit unclear to me.

This shutdown method is required by implementors of the AsyncWrite trait. Wrappers typically just want to proxy this call through to the wrapped type, and base types will typically implement shutdown logic here or just return Ok(().into()).

Is this always the case? In #35 we found a case where we might want to call shutdown to write out the zstd end-of-stream marker but not to actually shutdown the underlying AsyncWrite -- maybe you'd want to write more data to it afterwards? Is that even a legit use case for shutdown?

Invocation of a shutdown implies invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened.

What does that mean? Do I as an implementer of shutdown have to call self.flush inside it unconditionally? If I'm a wrapper over another AsyncWrite, do I only have to call self.flush if I have some internal buffering?

Rename `AsyncWrite::shutdown`

Because this function is unrelated to TCP shutdown (and the AsyncWrite::shutdown impl for TcpStream does not call shutdown), the name of the function is very confusing.

A couple of options:

  • finalize
  • prepare_for_close
  • poll_close_ready
  • start_close

Related: #73.

FramedRead only calls decode_eof when there is buffered data

This issue is related to the discussion in tokio-rs/tokio-core#178

Given the tokio-proto pattern of representing a body stream as Body(Option<Chunk>), an HTTP 1.1 Decoder would require decode_eof to be called even if there is no more data.

In the case of streaming bodies w/ Connection: close and no content length, the body stream is all the data until the socket is closed. This means that an HTTP 1.1 decoder would need to emit a Body(None) frame on EOF when there is no buffered data.

Clean shutdown of TCP sockets

I just implemented clean connection shutdown again in Thrussh (since I moved it to Tokio a few months ago).

The Tokio interface for that was not totally easy. Here are things I'd like to have:

  • I needed to call .shutdown(Shutdown::Both) on TcpStream. This works as long as the underlying connection is indeed a TcpStream, but what if I want to run the connection in a buffer for testing? I wrote a trait called Tcp, containing a method shutdown that does that, but I guess there could be something more generic.

  • There is a name conflict on shutdown methods in TcpStream and AsyncWrite. Not a big deal for me, but on a larger codebase, it might be burdensome. How about having the shutdown method of TcpStream not take self, but instead having to call tokio_core::net::TcpStream::shutdown(&mut stream)?

  • Or there might be something more general: I don't know whether "TCP shutdown" should be called from within the AsyncWrite::shutdown method of TcpStream, because there might be a number of parameters to a proper TCP shutdown, in some cases. But I guess it is fairly standard to send a shutdown and waiting for a read to return 0, so why not have another type called CleanShutdownTcpStream whose Async::shutdown method does exactly that?

Consider Removing the `io::Read` and `io::Write` supertraits on AsyncRead and AsyncWrite

Many of the methods on io::Read and io::Write, such as write_all, are not appropriate to call in an async context. Furthermore, lots of code using io::Read and io::Write expects (correctly or not) that those interfaces are blocking. Removing the supertraits would make it harder to misuse the AsyncRead and AsyncWrite types as io::Read and io::Write types, and would allow moving the io::XXX functions to methods on the AsyncRead and AsyncWrite traits (this last point is a benefit because methods are more easily discoverable than free functions). This change could also allow AsyncRead and AsyncWrite types with non-io::Error error types.

There are places in which it makes sense to provide an async-readable/writeable type as an io::Read or io::Write. That functionality can still be provided by choosing to opt-in to an io::Read or io::Write impl, or by using some form of wrapper type which allows using an AsyncRead/AsyncWrite as an io::Read/io::Write (basically the inverse of #76).

cc @carllerche

try_ready! equivalent for Decoder

I want to use some utility functions across multiple Decoder implementations to decode data of various types. If the return type of decode were Result<Async<Self::Item>, Self::Error>, I could use try_ready! from the futures crate. decode uses Option instead of Async, however, so I'm forced to either repeat code to the effect of

let val = match my_utility_fn(src)? {
    Some(val) => val,
    None => return Ok(None),
};

or write my own macro. It would be nice to have a convenient, common macro for this pattern in one of the tokio crates.

Do no depend on Bytes property of preserving initial capacity

Currently Decoder implementations depend on the fact that Bytes have two capacity fields: current capacity and initial capacity.

See discussion there: tokio-rs/bytes#122 (comment)

I think that part of Bytes is overengineered, and if initial capacity needs to be preserved, this property should be solved by tokio-io. (For example, by introducing Bytes wrapper which can preserve initial capacity).

'lines_decoder' fails on 32-bit

	thread 'lines_decoder' panicked at 'assertion failed: self.remaining_mut() >= src.remaining()', /usr/share/cargo/registry/bytes-0.4.5/src/buf/buf_mut.rs:229:8

Not sure what other information to add..

add `read_at_least` function?

read_at_least is similar to read_exact but read at least min bytes. reads from reader into buf until it has read at least min bytes. It returns the number of bytes copied success , when n >= min.

Consider switching Framed & co. to be generic over the error vs. using io::Error

This has been requested a few times. I am creating this issue to track the discussion.

The original point for adding Codec in tokio-core was to provide an easy path to get from a TCP -> a server using tokio-proto. In the server case, tokio-proto is the on handling the socket. As such, it is unable to take advantage of "generic" errors. If the codec errors out, tokio-proto shutdown the connection. Given that the goal is an easy mode to plug into tokio-proto, and that -proto requires an io::Error (or some fixed error type) anyway, Codec was hard coded to io::Error in order to remove that decision for a newcomer to Tokio.

A number of requests have been made to switch Encoder / Decoder (the next iteration of Codec) to be generic over error. This would depart from the original goal described above to make it a more general purpose abstraction for building transports.

The questions as far as I can tell are:

a) How much does this change impact the getting started experience?
b) Should the getting started experience be split out from Framed?

FramedRead::into_inner throws away buffered data

(This is an issue with the current release version of tokio, but I decided to file it here since this looks like the future.)

More fun in my adventures with tokio :) I was trying to use Framed::into_inner and I found out that anything buffered gets thrown away. It would probably be worth adding a variant that returns a wrapper around the backing Read object that prepends whatever was buffered here.

Support for async seeking

It would be cool to have async seeking support, additionally to async writing and async reading. I would need it for a tokio backend for a planned version of my ogg crate that supports seeking and has better async support. Otherwise I'd have to continue to rolly my own, which I wouldn't want.

LinesCodec repeatedly returns final line when line is not terminated with \n

I would expect that if one sends data that is not terminated by newline at EOF, LinesCodec either returns an error or returns the remaining data and thereafter returns None (I think the latter is what is intended).

What actually happens is that it returns the remaining data over and over again with repeated calls to decode_eof.

let mut codec = LinesCodec::new();
let buf = &mut BytesMut::new();
buf.put("line 1");
assert_eq!(None, codec.decode(buf).unwrap());
assert_eq!("line 1", codec.decode_eof(buf).unwrap().unwrap());

// assert_eq!(None, codec.decode_eof(buf).unwrap());   // What I expect

assert_eq!("line 1", codec.decode_eof(buf).unwrap().unwrap());  // What actually happens
assert_eq!("line 1", codec.decode_eof(buf).unwrap().unwrap());  // .. repeatedly

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.