dizda / fast-socks5 Goto Github PK
View Code? Open in Web Editor NEWFast SOCKS5 client/server implementation written in Rust async/.await (with tokio)
Home Page: https://anyip.io/
License: MIT License
Fast SOCKS5 client/server implementation written in Rust async/.await (with tokio)
Home Page: https://anyip.io/
License: MIT License
I'm trying to connect to socks5 over tor without opening a port. This is what I did:
https://github.com/acheong08/arti/blob/bb0cba1130d75ee0e58c9a42de0bad4a8ee14fa4/examples/hyper/hyper-http-hs-example/src/main.rs#L107-L160
Running and connection, I get:
SOCKS5 connection
onion service stream accepted
spawning task
task spawned
But the task never completes:
fn spawn_and_log_error<F, T, A>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<Socks5Socket<T, A>>> + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin,
A: Authentication,
{
eprintln!("spawning task");
task::spawn(async move {
eprintln!("task spawned");
if let Err(e) = fut.await { // <---- Stuck here
error!("{:#}", &e);
}
eprintln!("task finished");
})
}
Any idea what could be causing the problem?
Using Socks5Stream:
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Other(Received malformed reply
Caused by:
early eof)', src/client.rs:410:26
It appears that the code wants a 4 byte response, but it is only getting a 2 byte response.
Will double check my findings when more awake.
In regards to #15. I managed to make that all work.
However, and please check the code linked there, I had to make it catch
ErroKind (NotConnected/ConnectionReset).
This essentially does mean that we do have access to the bytes send/received (written/read).
However even when no error that number does always seem to be (0,0) (used as a router, but even when running a regular server as per your example).
I tried to write copy directional myself (copying the code and catching the error):
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use fast_socks5::ready;
use std::future::Future;
use std::io::{self, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};
enum TransferState {
Running(CopyBuffer),
ShuttingDown(u64),
Done(u64),
}
struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> {
a: &'a mut A,
b: &'a mut B,
a_to_b: TransferState,
b_to_a: TransferState,
}
fn transfer_one_direction<A, B>(
cx: &mut Context<'_>,
state: &mut TransferState,
r: &mut A,
w: &mut B,
) -> Poll<io::Result<u64>>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
let mut r = Pin::new(r);
let mut w = Pin::new(w);
loop {
match state {
TransferState::Running(buf) => {
let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
*state = TransferState::ShuttingDown(count);
}
TransferState::ShuttingDown(count) => {
match ready!(w.as_mut().poll_shutdown(cx)) {
Ok(_) => (),
Err(err) => match err.kind() {
ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
_ => return Poll::Ready(Err(err)),
},
};
*state = TransferState::Done(*count);
}
TransferState::Done(count) => return Poll::Ready(Ok(*count)),
}
}
}
impl<'a, A, B> Future for CopyBidirectional<'a, A, B>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result<(u64, u64)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Unpack self into mut refs to each field to avoid borrow check issues.
let CopyBidirectional {
a,
b,
a_to_b,
b_to_a,
} = &mut *self;
let a_to_b = transfer_one_direction(cx, a_to_b, &mut *a, &mut *b)?;
let b_to_a = transfer_one_direction(cx, b_to_a, &mut *b, &mut *a)?;
// It is not a problem if ready! returns early because transfer_one_direction for the
// other direction will keep returning TransferState::Done(count) in future calls to poll
let a_to_b = ready!(a_to_b);
let b_to_a = ready!(b_to_a);
Poll::Ready(Ok((a_to_b, b_to_a)))
}
}
/// Copies data in both directions between `a` and `b`.
///
/// This function returns a future that will read from both streams,
/// writing any data read to the opposing stream.
/// This happens in both directions concurrently.
///
/// If an EOF is observed on one stream, [`shutdown()`] will be invoked on
/// the other, and reading from that stream will stop. Copying of data in
/// the other direction will continue.
///
/// The future will complete successfully once both directions of communication has been shut down.
/// A direction is shut down when the reader reports EOF,
/// at which point [`shutdown()`] is called on the corresponding writer. When finished,
/// it will return a tuple of the number of bytes copied from a to b
/// and the number of bytes copied from b to a, in that order.
///
/// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown
///
/// # Errors
///
/// The future will immediately return an error if any IO operation on `a`
/// or `b` returns an error. Some data read from either stream may be lost (not
/// written to the other stream) in this case.
///
/// # Return value
///
/// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
CopyBidirectional {
a,
b,
a_to_b: TransferState::Running(CopyBuffer::new()),
b_to_a: TransferState::Running(CopyBuffer::new()),
}
.await
}
#[derive(Debug)]
struct CopyBuffer {
read_done: bool,
need_flush: bool,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}
impl CopyBuffer {
pub(super) fn new() -> Self {
Self {
read_done: false,
need_flush: false,
pos: 0,
cap: 0,
amt: 0,
buf: vec![0; 2048].into_boxed_slice(),
}
}
pub(super) fn poll_copy<R, W>(
&mut self,
cx: &mut Context<'_>,
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
) -> Poll<io::Result<u64>>
where
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
let me = &mut *self;
let mut buf = ReadBuf::new(&mut me.buf);
match reader.as_mut().poll_read(cx, &mut buf) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => match err.kind() {
ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
_ => return Poll::Ready(Err(err)),
},
Poll::Pending => {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
match ready!(writer.as_mut().poll_flush(cx)) {
Ok(_) => (),
Err(err) => match err.kind() {
ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
_ => return Poll::Ready(Err(err)),
},
};
self.need_flush = false;
}
return Poll::Pending;
}
}
let n = buf.filled().len();
if n == 0 {
self.read_done = true;
} else {
self.pos = 0;
self.cap = n;
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let me = &mut *self;
let i = match ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap])) {
Ok(i) => (i),
Err(err) => match err.kind() {
ErrorKind::NotConnected | ErrorKind::ConnectionReset => 1,
_ => return Poll::Ready(Err(err)),
},
};
if i == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte into writer",
)));
} else {
self.pos += i;
self.amt += i as u64;
self.need_flush = true;
}
}
// If pos larger than cap, this loop will never stop.
// In particular, user's wrong poll_write implementation returning
// incorrect written length may lead to thread blocking.
debug_assert!(
self.pos <= self.cap,
"writer returned length larger than input slice"
);
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
match ready!(writer.as_mut().poll_flush(cx)) {
Ok(_) => (),
Err(err) => match err.kind() {
ErrorKind::NotConnected | ErrorKind::ConnectionReset => (),
_ => return Poll::Ready(Err(err)),
},
};
return Poll::Ready(Ok(self.amt));
}
}
}
}
However this still has the same issue, that is, the returned data read/written is (0,0).
It is not as simple as that:
fast_socks5::server] incoming connection from peer 127.0.0.1:64345 @ 127.0.0.1:1337
[2022-05-05T22:26:57Z DEBUG proxy_gateway] handle incoming socket
[2022-05-05T22:26:57Z DEBUG proxy_gateway] upgrade incoming socket as socks5 proxy
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Handshake headers: [version: 5, methods len: 3]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] methods supported sent by the client: [0, 1, 2]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Reply with method AuthenticationMethod::Password (2)
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Auth: [version: 1, user len: 9]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] username bytes: [112, 117, 112, 112, 101, 116, 101, 101, 114]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Auth: [pass len: 3]
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] password bytes: [98, 97, 114]
[2022-05-05T22:26:57Z INFO fast_socks5::server] User `puppeteer` logged successfully.
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Request: [version: 5, command: 1, rev: 0, address_type: 1]
[2022-05-05T22:26:57Z DEBUG fast_socks5::util::target_addr] Address type `IPv4`
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Request target is 185.199.108.153:443
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Domain won't be resolved because `dns_resolve`'s config has been turned off.
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Connected to remote destination
[2022-05-05T22:26:57Z DEBUG fast_socks5::server] Wrote success
[2022-05-05T22:26:58Z INFO fast_socks5::server] transfer closed (615, 11010)
[2022-05-05T22:26:58Z DEBUG proxy_gateway] log original target address of incoming socket
[2022-05-05T22:26:58Z DEBUG proxy_gateway] resolve target dns for incoming socket
[2022-05-05T22:26:58Z INFO fast_socks5::client] Connected @ 127.0.0.1:1338
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Send version and method len [5, 2]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] client auth methods supported: [0, 2]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Socks version (5), method chosen: 2.
[2022-05-05T22:26:58Z INFO fast_socks5::client] Password will be used
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Auth: [version: 1, is_success: 0]
[2022-05-05T22:26:58Z INFO fast_socks5::client] Requesting headers `Some(Ip(185.199.108.153:443))`...
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] TargetAddr::IpV4
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] addr ip [185, 199, 108, 153]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Bytes long version: [5, 1, 0, 1, 185, 199, 108, 153, 1, 187, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Bytes shorted version: [5, 1, 0, 1, 185, 199, 108, 153, 1, 187]
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Padding: 10
[2022-05-05T22:26:58Z DEBUG fast_socks5::client] Reply received: [version: 5, reply: 0, rsv: 0, address_type: 1]
[2022-05-05T22:26:58Z DEBUG fast_socks5::util::target_addr] Address type `IPv4`
[2022-05-05T22:26:58Z INFO fast_socks5::client] Remote server bind on 127.0.0.1:0.
[2022-05-05T22:26:58Z INFO proxy_gateway] socket transfer closed (0, 0)
"2022-05-05 22:26:57.929731 UTC","185.199.108.153:443","127.0.0.1:1338","","",0
From the logs we can see that the size is returned for the initial part of the socks5 stream (header),
but once we do the actual transfer we seem to log nothing.
What am I doing wrong here? both in my fork and vanilla code.
I really need for my purposes to be able to log the payload size.
Is there any way to use it as a library instead of building to binary?
Hello,
Would it be possible to implement a async function support for the authentication logic?
Currently the only thing one can do is repeatedly do upgrade_to_socks5
until it fails, when desiring to run a direct contact, or how does it work? Or does that function block until it is completely finished? I imagine not as otherwise my router wouldn't work?
Would it be possible to use this library in an embedded setting with no_std?
I have a scenario where I'd like to use it in a low-power environment.
What should I do if the proxy server address is HTTP?
For logging purposes I would like to have access to the original unresolved target address.
There is a flag in the config to disable dns resolve but in that case an upgrade to socket5 fails:
socket handle error = upgrade incoming socket to socks5: i/o error: Domain name has to be explicitly resolved, please use TargetAddr::resolve_dns().: Domain name has to be explicitly resolved, please use TargetAddr::resolve_dns().
and without the upgrade to socket5 we do not have access to any target address..
I know my use case is not typical, but would it be possible to also store the original target address separate,
such that one can also get access to the original one? Because once the socket is upgraded to socks5
its target address is already DNS resolved...
It's great to see more projects in Rust, I also like Rust. I wrote a socks5 server in C, and just out of curiosity, I compared their performance. (fast-socks5 vs hev-socks5-server)
The hardware is an AMD Ryzen 9 7950X 16-Core Processor, bandwidth was measured using iperf3
, and the iperf3
client was proxied through socks5 using proxychains-ng
.
# hev-socks5-server
bin/hev-socks5-server conf/main.yml # workers: 32
# fast-socks5
cargo run -r --example server -- --listen-addr 127.0.0.1:1080 no-auth
# iperf3 server
iperf3 -s
# download (1-thread)
proxychains iperf3 -c 127.0.0.1 -R
# download (10-thread)
proxychains iperf3 -c 127.0.0.1 -R -P 10
# upload (1-thread)
proxychains iperf3 -c 127.0.0.1
# upload (10-thread)
proxychains iperf3 -c 127.0.0.1 -P 10
The test focused on transfer speed, CPU usage and memory usage.
hev-socks5-server | Speed (Gbits/s) | CPU usage (%) | Memory usage (MB) |
---|---|---|---|
Download (1-thread) | 46.7 | 54 | 1.8 |
Download (10-thread) | 274 | 380 | 1.8 |
Upload (1-thread) | 46.7 | 54 | 1.8 |
Upload (10-thread) | 288 | 430 | 1.8 |
fast-socks5 | Speed (Gbits/s) | CPU usage (%) | Memory usage (MB) |
---|---|---|---|
Download (1-thread) | 38.7 | 92 | 3.2 |
Download (10-thread) | 121 | 858 | 3.5 |
Upload (1-thread) | 28.9 | 64 | 3.2 |
Upload (10-thread) | 128 | 882 | 3.5 |
I wonder what am I doing wrong, but I can't get the server functioning.
What I did on v0.4.0 is:
RUST_LOG=debug cargo run --example server -- --listen-addr 127.0.0.1:1337 no-auth
Finished dev [unoptimized + debuginfo] target(s) in 0.07s
Running `target/debug/examples/server --listen-addr '127.0.0.1:1337' no-auth`
[2020-12-18T01:04:59Z WARN server] No authentication has been set!
[2020-12-18T01:04:59Z INFO server] Listen for socks connections @ 127.0.0.1:1337
then I've verified that the socks server is working via curl:
curl --socks5 127.0.0.1:1337 https://www.google.com
curl: (28) SOCKS5 read timeout
The thing is, that the curl command runs after a very long time into a timeout, so it is not failing per se.
I debugged the thing and found that the futures::ready!(fut.poll(cx))?;
does not finish as if the .accept()
hangs somewhere in limbo.
Any idea what might be the problem?
rustc --version && cargo --version
rustc 1.48.0 (7eac88abb 2020-11-16)
cargo 1.48.0 (65cbdd2dc 2020-10-14)
on macOS Big Sur 11.0.1 (20B29)
cargo run -- --listen-addr 0.0.0.0:5000 password -u 111 -p 333
In the code, I have written config.set_udp_support(true);
However, it still cannot support UDP and prompts "invalid reply ip"
I saw there's a request_timeout
on the server config, that makes sense.
However, I think it can be useful to also set a client config request_timeout
. So basically, if the client on is set, then take that as timeout, or else default to the server config's request_timeout
.
Does that make sense to add to this lib?
From the documentation or examples it is not clear what the intended approach with this library is when one would want to forward from a fast-socks5
proxy to another proxy?
I'll keep digging myself once i find time, and if I find it myself I'll post it here.
But if someone knows the answer from the top of their head, that would be even better :)
When a client disconnects on MacOS (I think you are aware of this) we currently get false error logs. It's only logs so is not critical, but it would mean that it shows up in my production logs if I would ever use this project in production.
e.g.
ERROR fast_socks5::server] transfer error: Os { code: 57, kind: NotConnected, message: "Socket is not connected" }
within my code I catch some of those as:
Err(err) => match err.kind() {
ErrorKind::NotConnected => {
info!("socket transfer closed by client");
Ok(())
}
ErrorKind::ConnectionReset => {
info!("socket transfer closed by downstream proxy");
Ok(())
}
_ => {
traffic_log.err_code = 1;
Err(SocksError::Other(anyhow!(
"socket transfer error: {:#}",
err
)))
}
},
I do not think this code is complete or the best, but it is something in hopefully the right direction?
Probably we can make some util code for this as there might be a couple of locations where we want to
treat certain expected errors as non-errors.
In the case of your server example (so a direct usage of your socket (client<->socket<->target
) it currently doesn't seem to be possible to get the bytes read/written? Would this be possible, I imagine so?
Hi, is there any plan to implement starting a proxy server using something like "Long Polling" to enable starting proxy servers without port forwarding?
Alternatives to long polling might also be
Short Polling (wastes more cpu and network resources)
Websockets (wastes way more resources, and there is no auto reconnection, and no auth system)
Server-Sent Events (harder to implement and wastes more resources)
Reason: some people like myself have port forwarding blocked by isps and would like to still be able to use their home network as a proxy
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.