Giter Site home page Giter Site logo

esockd's Introduction

esockd Build status

Erlang General Non-blocking TCP/SSL Socket Server.

Features

  • General Non-blocking TCP/SSL Socket Server
  • Acceptor Pool and Asynchronous TCP Accept
  • UDP/DTLS Server
  • Max connections management
  • Allow/Deny by peer address
  • Proxy Protocol V1/V2
  • Keepalive Support
  • Rate Limit
  • IPv6 Support

Usage

A Simple TCP Echo Server:

-module(echo_server).

-export([start_link/2, init/2]).

start_link(Transport, Sock) ->
    {ok, spawn_link(?MODULE, init, [Transport, Sock])}.

init(Transport, Sock) ->
    case Transport:wait(Sock) of
        {ok, NewSock} ->
            loop(Transport, NewSock);
        Error -> Error
    end.

loop(Transport, Sock) ->
    case Transport:recv(Sock, 0) of
        {ok, Data} ->
            {ok, Peername} = Transport:peername(Sock),
            Transport:send(Sock, Data),
            loop(Transport, Sock);
        {error, Reason} ->
            io:format("TCP Error: ~s~n", [Reason]),
            {stop, Reason}
    end.

Setup Echo Server:

%% Start esockd application
ok = esockd:start().
Options = [{acceptors, 10}, {max_connections, 1024}, {tcp_options, [binary, {reuseaddr, true}]}].
MFArgs = {echo_server, start_link, []},
esockd:open(echo, 5000, Options, MFArgs).

Examples

Example Description
examples/async_recv prim_net async recv/send
examples/gen_server gen_server behaviour
examples/simple simple echo server
examples/ssl ssl echo server
examples/proxy_protocol proxy protocol v1/2
examples/udp udp echo server
examples/dtls dtls echo server

API

Open a listener

esockd:open(echo, 5000, [{tcp_options, [binary, {reuseaddr, true}]}],
            {echo_server, start_link, []}).

esockd:open(echo, {"127.0.0.1", 6000}, [{tcp_options, [binary, {reuseaddr, true}]}],
            {echo_server, start_link, []}).

Spec:

-spec(open(Protocol, ListenOn, Options, MFArgs) -> {ok, pid()} | {error, term()} when
           Protocol :: atom(),
           ListenOn :: inet:port_number() | {host(), inet:port_number()}),
           Options  :: [option()],
           MFArgs   :: esockd:mfargs()).

Option:

-type(option() :: {acceptors, pos_integer()}
                | {max_connections, pos_integer()}
                | {max_conn_rate, pos_integer()}
                | {access_rules, [esockd_access:rule()]}
                | {shutdown, brutal_kill | infinity | pos_integer()}
                | tune_buffer | {tune_buffer, boolean()}
                | proxy_protocol | {proxy_protocol, boolean()}
                | {proxy_protocol_timeout, timeout()}
                | {ssl_options, [ssl:ssl_option()]}
                | {udp_options, [gen_udp:option()]}
                | {dtls_options, [gen_udp:option() | ssl:ssl_option()]}).

MFArgs:

-type(mfargs() :: atom() | {atom(), atom()} | {module(), atom(), [term()]}).

Get Setting and Stats

Get stats:

esockd:get_stats({echo, 5000}).

Get acceptors:

esockd:get_acceptors({echo, {"127.0.0.1", 6000}}).

Get/Set max connections:

esockd:get_max_connections({echo, 5000}).
esockd:set_max_connections({echo, 5000}, 100000).

Allow/Deny

Same to Allow/Deny Syntax of nginx:

allow address | CIDR | all;

deny address | CIDR | all;

allow/deny by options:

esockd:open(echo, 5000, [{access, [{deny, "192.168.1.1"}, {allow, "192.168.1.0/24"}, {deny, all}]}], MFArgs).

allow/deny by API:

esockd:allow({echo, 5000}, all).
esockd:allow({echo, 5000}, "192.168.0.1/24").
esockd:deny({echo, 5000}, all).
esockd:deny({echo, 5000}, "10.10.0.0/16").

Close a listener

esockd:close(echo, 5000).
esockd:close(echo, {"127.0.0.1", 6000}).

Spec:

-spec(close(Protocol, ListenOn) -> ok when Protocol :: atom(), ListenOn :: inet:port_number() | {host(), inet:port_number()}).

SSL

Connecting to ssl_echo_server:

openssl s_client -connect 127.0.0.1:5000 -ssl3

openssl s_client -connect 127.0.0.1:5000 -tls1

Design

Supervisor Tree

esockd_sup
    -> esockd_listener_sup
        -> esockd_listener
        -> esockd_acceptor_sup
            -> esockd_acceptor
            -> esockd_acceptor
            -> ......
        -> esockd_connection_sup
            -> esockd_connection
            -> esockd_connection
            -> ......

Acceptor

  1. Acceptor Pool

  2. Suspend for one second when e{n, m}file errors happened

Connection Sup

  1. Create a connection, and let it run...

  2. Control maximum connections

  3. Count active connections

  4. Count shutdown reasons

CIDR

CIDR Wiki: https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing

Benchmark

Benchmark 2.1.0-alpha release on one 8 cores, 32G memory ubuntu/14.04 server::

250K concurrent connections, 50K messages/sec, 40Mbps In/Out consumed 5G memory, 20% CPU/core

License

Apache License Version 2.0

Author

EMQ X Team.

esockd's People

Contributors

emqplus avatar getong avatar gilbertwong96 avatar grutabow avatar hjianbo avatar huangdan avatar id avatar iequ1 avatar joaohf avatar keynslug avatar lafirest avatar qzhuyan avatar savonarola avatar terry-xiaoyu avatar thalesmg avatar tigercl avatar wwhai avatar zhongwencool avatar zmstone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

esockd's Issues

SSL example seems to be broken - init terminating in do_boot

OS: Ubuntu 14
Erl : up to date, newest apt-get install

I am trying to run the ssl example. I run rebar compile, and when I use ./run I get the following output:

=PROGRESS REPORT==== 5-Jul-2017::09:10:57 ===
application: ssl
started_at: nonode@nohost
{"init terminating in do_boot",{undef,[{esockd,start,[],[]},{ssl_echo_server,start,1,[{file,"src/ssl_echo_server.erl"},{line,42}]},{init,start_em,1,[]},{init,do_boot,3,[]}]}}
init terminating in do_boot ()

Crash dump is being written to: erl_crash.dump...done

SSL PSK cipher suite support

Hello :

When I want use TLS PSK cipher suite:

Add {psk, aes_128_cbc, sha256} to ciphers configuration. emqttd_app.erl

start_listener({ssl, ListenOn, Opts}) ->
    Ciphers = proplists:get_value(ciphers, Opts, []),
    Ciphers1 = proplists:expand(Ciphers, [{psk, aes_128_cbc, sha256}]),
    Opts1 = emqttd_misc:merge_opts(Opts, [{ciphers, Ciphers1}, {user_lookup_fun, fun user_lookup/3, <<"secret">>}]),
    start_listener('mqtt:ssl', ListenOn, Opts1);

user_lookup(psk, Username, UserState) ->
    lager:debug("## PSK identity ~p ~p", [Username, UserState]),
    {ok, UserState}.

When I use client to test it, server return error:

 19:48:19.927 [info] TLS server: In state hello at tls_handshake.erl:197 generated SERVER ALERT: Fatal - Insufficient Security - no_suitable_ciphers

But I use code blow, it works well:

-module(psk_tls_test).
%% API
-export([start/0, client/1, accept/1]).

start() ->
    ssl:start(),
    server(8883).

server(Port) ->
    {ok, LSocket} = ssl:listen(Port, [
        {ciphers, [{psk, aes_128_cbc, sha256}]},
        {user_lookup_fun,
            {fun user_lookup/3, <<"secret">>}},
        {reuseaddr, true}]),
    spawn(fun() -> accept(LSocket) end).

user_lookup(psk, Username, UserState) ->
    io:format("Parameter ~p ~p~n", [Username, UserState]),
    {ok, UserState}.

accept(LSocket) ->
    {ok, Socket} = ssl:transport_accept(LSocket),
    case ssl:ssl_accept(Socket) of
        ok -> Pid = spawn(fun() ->
            io:format("Connection accepted ~p~n", [Socket]),
            loop(Socket)
                          end),
            ssl:controlling_process(Socket, Pid),
            accept(LSocket);
        Reason ->
            io:format("Connection Error ~p~n", [Reason])
    end.

loop(Socket) ->
    ssl:setopts(Socket, [{active, once}]),
    receive
        {ssl, Sock, Data} ->
            io:format("Got packet: ~p~n", [Data]),
            ssl:send(Sock, Data),
            loop(Socket);
        {ssl_closed, Sock} ->
            io:format("Closing socket: ~p~n", [Sock]);
        Error ->
            io:format("Error on socket: ~p~n", [Error])
    end.
client(N) ->
    {ok, Socket} = ssl:connect("localhost", 8883, [{ciphers, [{psk, aes_128_cbc, sha256}]}, {user_lookup_fun,
        {fun user_lookup/3, <<"secret">>}}]),
    io:format("Client opened socket: ~p~n", [Socket]),
    ok = ssl:send(Socket, N),
    Value = receive
                {ssl, {sslsocket, new_ssl, _}, Data} ->
                    io:format("Client received: ~p~n", [Data])
            after 2000 ->
            0
            end,
    ssl:close(Socket),
    Value.

esockd_connection_sup should be redesinged

esockd_connectoin_sup should not use supervisor with 'simple_one_for_one' mode..

esockd_connection_sup should be responsible for:

  1. link or monitor all children
  2. max connections controll
  3. deny/allow access control

the most important issue is how to store pids of children?

dict? set? map? proccess dictionary?

lager_logger:error undef exception

** When handler state == {<0.42.0>,"log/emqttd_sasl.log",all}
** Reason == {'module could not be loaded',
[{lib,format_exception,
[5,exit,
{undef,
[{lager_logger,error,
["acceptor on ~s suspend 100(ms) for ~p emfile errors!!!",
[["0.0.0.0",58,"1883"],0]],
[]},
{esockd_acceptor,sockerr,2,
[{file,"src/esockd_acceptor.erl"},{line,169}]},
{gen_server,try_dispatch,4,
[{file,"gen_server.erl"},{line,615}]},
{gen_server,handle_msg,5,
[{file,"gen_server.erl"},{line,681}]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,240}]}]},
[{gen_server,terminate,7,
[{file,"gen_server.erl"},{line,826}]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,240}]}],
#Fun<proc_lib.0.32310957>,#Fun<proc_lib.1.32310957>,
latin1],
[]},
{proc_lib,format_exception,4,
[{file,"proc_lib.erl"},{line,739}]},
{proc_lib,format_rep,2,[{file,"proc_lib.erl"},{line,728}]},
{proc_lib,format_rep,2,[{file,"proc_lib.erl"},{line,730}]},
{proc_lib,format_rep,2,[{file,"proc_lib.erl"},{line,726}]},
{proc_lib,format,3,[{file,"proc_lib.erl"},{line,714}]},
{sasl_report,write_report2,5,
[{file,"sasl_report.erl"},{line,72}]},
{sasl_report_file_h,handle_event,2,
[{file,"sasl_report_file_h.erl"},{line,44}]}]}

ssl socket controlling_process

I wrote a async ssl server with esockd. And it worked. Now I want to spawn a new process using the simple_one_for_one for this ssl socket, but it nothing happened.

%% tls_server.erl
handle_info({inet_async, Socket, _Ref, {ok, Data}}, #state{transport = Transport, socket = Socket}) ->
    {ok, {IP, _Port}} = Transport:peername(Socket),
    io:format("Ip is ~p~n", [IP]),

    %% New client connected - spawn a new process using the simple_one_for_one
    Pid = erlim_tls_receiver_sup:start_child(Transport, Socket),
    Transport:controlling_process(Socket, Pid),
    {noreply, State}.

New connect is

%% tls_receiver.erl

%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Transport, Socket) ->
    gen_server:start_link(?MODULE, [Transport, Socket], []).


%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%%                     {ok, State, Timeout} |
%%                     ignore |
%%                     {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([Transport, Socket]) ->
    {ok, {IP, _Port}} = Transport:peername(Socket),
    io:format("Ip is ~p~n", [IP]),
    NewState = #state{transport = Transport, socket = Socket, ip = IP, node = node()},
    Transport:async_recv(Socket, 0, infinity),
    {ok, NewState}.


handle_info({inet_async, Socket, _Ref, {ok, Data}}, #state{transport = Transport, socket = Socket}) ->
    io:format("Data is ~p~n", [Data]).

Gen_sever example in detail.

Hello , i was wondering how to perfectly user esockd with gen_server or gen_fsm or gen_statem ?? can you give tutorial in detail ? i know there is example in examples folder, but i want something like you guys implemented in emqttd.

Is the Docs out dated??

I found in the emqttd broker, in emqttd_client.erl, there is only one start_link/2 function, and the docs of esockd, states using a start_link/1 function instead.

when i digged into the source code, i found esockd used the erlang:apply(M,F, [Conn|Args]), which is the start_link/2, so is the doc not updated? if not, please do it, otherwise it will make us confused.

acceptor crasher: {accept_error,econnaborted}

=CRASH REPORT==== 15-Apr-2016::06:53:13 ===
crasher:
initial call: esockd_acceptor:init/1
pid: <0.348.0>
registered_name: []
exception exit: {accept_error,econnaborted}
in function gen_server:terminate/7 (gen_server.erl, line 826)
ancestors: [<0.325.0>,<0.323.0>,esockd_sup,<0.122.0>]
messages: []
links: [<0.325.0>]
dictionary: []
trap_exit: false
status: running
heap_size: 376
stack_size: 27
reductions: 64172532
neighbours:

esockd_connection:ack

I can't find the function ack in esockd_connection.erl.
What does esockd_connection:ack means?

args

esockd_acceptor_sup -> esockd_acceptor
input 4 args (esockd_acceptor, start_link, [ConnSup, AcceptStatsFun, BufferTuneFun, Logger])
accept 6 args (start_link(ConnSup, AcceptStatsFun, BufTuneFun, Logger, LSock, SockFun))
where is LSock, SockFun?

test broken

Debian stable (stretch)
Erlang pkgs 20.1-1 from Erlang Solutions

Hi folks,

test is failing:

$ rebar3 ct
===> Verifying dependencies...
===> Compiling esockd
===> Compiling src/esockd_proxy_proto.erl failed
src/esockd_proxy_proto.erl:134: function parse_proxy_v1/1 undefined
src/esockd_proxy_proto.erl:136: function parse_proxy_v2/1 undefined

Jean-Yves

{accept_error,econnaborted} crash report

=SUPERVISOR REPORT==== 26-Dec-2015::00:45:35 ===
     Supervisor: {<0.325.0>,esockd_acceptor_sup}
     Context:    child_terminated
     Reason:     {accept_error,econnaborted}
     Offender:   [{pid,<0.403.0>},
                  {name,acceptor},
                  {mfargs,
                      {esockd_acceptor,start_link,
                          [<0.324.0>,#Fun<esockd_server.0.76791649>,
                           #Fun<esockd_listener_sup.1.16895020>,
                           {gen_logger,lager_logger,8},
                           #Port<0.5635>,#Fun<esockd_transport.2.94509882>]}},
                  {restart_type,transient},
                  {shutdown,5000},
                  {child_type,worker}]

Is esockd_connection_sup a supervisor or a gen_server

%% start child
{ok, ConnSup} = supervisor:start_child(Sup,
{connection_sup,
{esockd_connection_sup, start_link, [Options, MFArgs, Logger]},
transient, infinity, supervisor, [esockd_connection_sup]})

% % but, esockd_connection_sup is a gen_server
-module(esockd_connection_sup).
-behaviour(gen_server).

So, Is it a bug or for some special purpose?

Rate limiting applied on communication to subscribers

Hello EMQ Team, Feng,

Rate limiting appear to be applied only to input flow from publishers to EMQ.
Could it be applied to output flow from EMQ, if not, could you plan to provide the same. This can be useful when the available bandwidth to subscribers is limited or when the subscriber can run only at a slower or fixed rate. This is wrt mqtt.listener.ssl.rate_limit and mqtt.listener.tcp.rate_limit in 2.1.2 version.

Thanks.

Can't get peercert subject name in ppv2

The https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt has not supported the cert Subject Name, but we supported it with Common Name 😒

subject(nossl) -> undefined;
subject(undefined) -> undefined;
subject(Cert) when is_binary(Cert) ->
esockd_ssl:peer_cert_subject(Cert);
subject(PP2Info) when is_list(PP2Info) ->
%%Notice: DN is not available in ppv2 additional info
proplists:get_value(pp2_ssl_cn, PP2Info).

This issue was found while investigating emqx/emqx#9147

esockd as dependency by rebar3 miss .hrl file while release

I add a few deps to my project with rebar3,
I use -include_lib("esockd/include/esockd.hrl"). to include esockd's hrl file,
after created release, I fould lib's esock include folder is empty so that could not found the hrl file, I think it's not necessary for esock, does any proper way to include those hrl file???

Too many connection_shutdown report logs when connections from unstable mobile network

015-09-07 19:52:45 =SUPERVISOR REPORT====
     Supervisor: 'esockd_connection_sup - <0.298.0>'
     Context:    connection_shutdown
     Reason:     duplicate_id
     Offender:   [{pid,<0.1020.40>},{name,connection},{mfargs,{emqttd_client,start_link,[[{max_clientid_len,1024},{max_packet_size,65536}]]}}]

2015-09-07 19:52:45 =SUPERVISOR REPORT====
     Supervisor: 'esockd_connection_sup - <0.298.0>'
     Context:    connection_shutdown
     Reason:     conn_closed
     Offender:   [{pid,<0.21403.17>},{name,connection},{mfargs,{emqttd_client,start_link,[[{max_clientid_len,1024},{max_packet_size,65536}]]}}]

...

support connection based udp packet content

The current esockd is based on socket peerinfo to create a connection, but the survival time of the udp session on the router is generally very short.
Coupled with the influence of NAT, the peerinfo of the same client may be constantly changing.
So may we can support create a connection based on the content carried in the udp package

connection_on_data

format nested list

  1. format({Addr, Port}) ->
    A = maybe_ntoab(Addr),
    io_lib:format("~s:~p", [A, Port]).
    better
  2. format({Addr, Port}) ->
    A = maybe_ntoab(Addr),
    lists:flatten(io_lib:format("~s:~p", [A, Port])).

cause 1) gives something like ["127.0.0.1",58,"4567"]
2) makes "127.0.0.1:4567"

Proxy Protocol does not work with ssl

a)I have tested ssl example, connection with my custom key, cert which seems to be receiving data fine ,
b)I have also tested proxy protocol example and I receive the data from the socket after connection,
Problem -
Tried adding the option {connopts, [proxy_protocol, {proxy_protocol_timeout, 1000}]} to the ssl example,
but connection seems to be failing.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.