Receiving data from single UDP socket with more than one erlang process
Sometimes there could be the storm of UDP packets hitting your app, but gen_udp
docs does not provide any clues on how to balance processing among several processes. But it's possible to share Socket
between workers and call gen_udp:recv
in a loop, so potential bottleneck could be eliminated this way.
Here is a samplegen_server
based handler and some code at the bottom for starting test instances.
-module(udp_test).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(PORT, 9876).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/2, send/1,
test_stuff/0, start_stuff/1]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
-record(state, { id=0, sock }).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link(ID, Sock) ->
gen_server:start_link(?MODULE, [{id, ID}, {socket, Sock}], []).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(Args) ->
ID = proplists:get_value(id, Args, 1),
case proplists:get_value(socket, Args) of
undefined ->
{stop, nosock};
Sock ->
{ok, #state{ id=ID, sock=Sock }, 0}
end.
handle_call(_Request, _From, State) ->
{stop, {error, unknownmsg}, State}.
handle_cast(_Request, State) ->
{stop, {error, unknownmsg}, State}.
handle_info(timeout, #state{ sock=undefined } = State) ->
{noreply, State, 10};
handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
TO = case gen_udp:recv(Sock, 4) of
{error, _} -> 10;
_Data ->
catch ets:insert_new(?MODULE, {ID, 0}),
catch ets:update_counter(?MODULE, ID, 1)
end,
{noreply, State, TO}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Test Function Definitions
%% ------------------------------------------------------------------
test_stuff() ->
start_stuff(10),
[udp_test:send("fooobaar") || _ <- lists:seq(1,1000)],
timer:sleep(1000),
io:format("Total: ~p~n", [ets:tab2list(?MODULE)]),
init:stop().
start_stuff(N) ->
ets:new(?MODULE, [named_table, public]),
{ok, S} = gen_udp:open(?PORT, [{active, false}, binary]),
[start_link(ID, S) || ID <- lists:seq(1, N)].
send(Data) ->
spawn(fun() ->
{ok, S} = gen_udp:open(0, [{active, false}, binary]),
gen_udp:send(S, localhost,?PORT, Data)
end).
Written by Valery Meleshkin
Related protips
1 Response
My previous comment seems to be deleted.
Thank you for posting this, it is exactly the type of thing I was needing. Earlier it was crashing the erl VM every time I ran test_stuff(), but now it seems to be working, I have no idea why.
I also noticed what I think to be is a bug in the the:
handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
TO = case gen_udp:recv(Sock, 4) of
{error, _} -> 10;
_Data ->
catch ets:insert_new(?MODULE, {ID, 0}),
catch ets:update_counter(?MODULE, ID, 1)
end,
{noreply, State, TO}.
code you are setting the timeout to the result of the ets:update_counter, probably not what you want, I changed it to:
handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
TO = case gen_udp:recv(Sock, 4) of
{error, _} -> 10;
_Data ->
catch ets:insert_new(?MODULE, {ID, 0}),
catch ets:update_counter(?MODULE, ID, 1),
10 % <--- TO = 10 not TO = ets:update_counter(..)
end,
{noreply, State, TO}.
and the throughput of the server doubled.
Thanks again for this gem, there is a complete lack of use of UDP in the Erlang literature.