Last Updated: February 25, 2016
·
3.898K
· nekrograve

Receiving data from single UDP socket with more than one erlang process

Sometimes there could be the storm of UDP packets hitting your app, but gen_udp docs does not provide any clues on how to balance processing among several processes. But it's possible to share Socket between workers and call gen_udp:recv in a loop, so potential bottleneck could be eliminated this way.

Here is a samplegen_server based handler and some code at the bottom for starting test instances.

-module(udp_test).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(PORT, 9876).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------

-export([start_link/2, send/1,
         test_stuff/0, start_stuff/1]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------

-export([init/1, handle_call/3, handle_cast/2, 
         handle_info/2, terminate/2, code_change/3]).

-record(state, { id=0, sock }).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

start_link(ID, Sock) ->
  gen_server:start_link(?MODULE, [{id, ID}, {socket, Sock}], []).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init(Args) ->
  ID = proplists:get_value(id, Args, 1),
  case proplists:get_value(socket, Args) of
    undefined -> 
      {stop, nosock};
    Sock -> 
      {ok, #state{ id=ID, sock=Sock }, 0}
  end.

handle_call(_Request, _From, State) ->
  {stop, {error, unknownmsg}, State}.

handle_cast(_Request, State) ->
  {stop, {error, unknownmsg}, State}.

handle_info(timeout, #state{ sock=undefined } = State) ->
  {noreply, State, 10};
handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
  TO = case gen_udp:recv(Sock, 4) of
    {error, _} -> 10;
    _Data ->
      catch ets:insert_new(?MODULE, {ID, 0}),
      catch ets:update_counter(?MODULE, ID, 1)
  end,
  {noreply, State, TO}.

terminate(_Reason, _State) ->
  ok.

code_change(_OldVsn, State, _Extra) ->
  {ok, State}.

%% ------------------------------------------------------------------
%% Test Function Definitions
%% ------------------------------------------------------------------

test_stuff() -> 
  start_stuff(10), 
  [udp_test:send("fooobaar") || _ <- lists:seq(1,1000)],
  timer:sleep(1000),
  io:format("Total: ~p~n", [ets:tab2list(?MODULE)]),
  init:stop().

start_stuff(N) -> 
  ets:new(?MODULE, [named_table, public]),
  {ok, S} = gen_udp:open(?PORT, [{active, false}, binary]),
  [start_link(ID, S) || ID <- lists:seq(1, N)].

send(Data) ->
  spawn(fun() ->
        {ok, S} = gen_udp:open(0, [{active, false}, binary]),
        gen_udp:send(S, localhost,?PORT, Data)
    end).

1 Response
Add your response

My previous comment seems to be deleted.

Thank you for posting this, it is exactly the type of thing I was needing. Earlier it was crashing the erl VM every time I ran test_stuff(), but now it seems to be working, I have no idea why.

I also noticed what I think to be is a bug in the the:

handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
  TO = case gen_udp:recv(Sock, 4) of
    {error, _} -> 10;
    _Data ->
      catch ets:insert_new(?MODULE, {ID, 0}),
      catch ets:update_counter(?MODULE, ID, 1)
  end,
  {noreply, State, TO}.

code you are setting the timeout to the result of the ets:update_counter, probably not what you want, I changed it to:

handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
  TO = case gen_udp:recv(Sock, 4) of
    {error, _} -> 10;
    _Data ->
      catch ets:insert_new(?MODULE, {ID, 0}),
      catch ets:update_counter(?MODULE, ID, 1),
      10 % <--- TO = 10 not  TO = ets:update_counter(..)
  end,
  {noreply, State, TO}.

and the throughput of the server doubled.

Thanks again for this gem, there is a complete lack of use of UDP in the Erlang literature.

over 1 year ago ·