[フレーム]
Last Updated: February 25, 2016
·
4.002K
· nekrograve

Receiving data from single UDP socket with more than one erlang process

Sometimes there could be the storm of UDP packets hitting your app, but gen_udp docs does not provide any clues on how to balance processing among several processes. But it's possible to share Socket between workers and call gen_udp:recv in a loop, so potential bottleneck could be eliminated this way.

Here is a samplegen_server based handler and some code at the bottom for starting test instances.

-module(udp_test).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(PORT, 9876).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------

-export([start_link/2, send/1,
 test_stuff/0, start_stuff/1]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------

-export([init/1, handle_call/3, handle_cast/2, 
 handle_info/2, terminate/2, code_change/3]).

-record(state, { id=0, sock }).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

start_link(ID, Sock) ->
 gen_server:start_link(?MODULE, [{id, ID}, {socket, Sock}], []).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init(Args) ->
 ID = proplists:get_value(id, Args, 1),
 case proplists:get_value(socket, Args) of
 undefined -> 
 {stop, nosock};
 Sock -> 
 {ok, #state{ id=ID, sock=Sock }, 0}
 end.

handle_call(_Request, _From, State) ->
 {stop, {error, unknownmsg}, State}.

handle_cast(_Request, State) ->
 {stop, {error, unknownmsg}, State}.

handle_info(timeout, #state{ sock=undefined } = State) ->
 {noreply, State, 10};
handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
 TO = case gen_udp:recv(Sock, 4) of
 {error, _} -> 10;
 _Data ->
 catch ets:insert_new(?MODULE, {ID, 0}),
 catch ets:update_counter(?MODULE, ID, 1)
 end,
 {noreply, State, TO}.

terminate(_Reason, _State) ->
 ok.

code_change(_OldVsn, State, _Extra) ->
 {ok, State}.

%% ------------------------------------------------------------------
%% Test Function Definitions
%% ------------------------------------------------------------------

test_stuff() -> 
 start_stuff(10), 
 [udp_test:send("fooobaar") || _ <- lists:seq(1,1000)],
 timer:sleep(1000),
 io:format("Total: ~p~n", [ets:tab2list(?MODULE)]),
 init:stop().

start_stuff(N) -> 
 ets:new(?MODULE, [named_table, public]),
 {ok, S} = gen_udp:open(?PORT, [{active, false}, binary]),
 [start_link(ID, S) || ID <- lists:seq(1, N)].

send(Data) ->
 spawn(fun() ->
 {ok, S} = gen_udp:open(0, [{active, false}, binary]),
 gen_udp:send(S, localhost,?PORT, Data)
 end).

1 Response
Add your response

My previous comment seems to be deleted.

Thank you for posting this, it is exactly the type of thing I was needing. Earlier it was crashing the erl VM every time I ran test_stuff(), but now it seems to be working, I have no idea why.

I also noticed what I think to be is a bug in the the:

handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
 TO = case gen_udp:recv(Sock, 4) of
 {error, _} -> 10;
 _Data ->
 catch ets:insert_new(?MODULE, {ID, 0}),
 catch ets:update_counter(?MODULE, ID, 1)
 end,
 {noreply, State, TO}.

code you are setting the timeout to the result of the ets:update_counter, probably not what you want, I changed it to:

handle_info(timeout, #state{ id=ID, sock=Sock } = State) ->
 TO = case gen_udp:recv(Sock, 4) of
 {error, _} -> 10;
 _Data ->
 catch ets:insert_new(?MODULE, {ID, 0}),
 catch ets:update_counter(?MODULE, ID, 1),
 10 % <--- TO = 10 not TO = ets:update_counter(..)
 end,
 {noreply, State, TO}.

and the throughput of the server doubled.

Thanks again for this gem, there is a complete lack of use of UDP in the Erlang literature.

over 1 year ago ·

AltStyle によって変換されたページ (->オリジナル) /