Ver código fonte

Fixed start_link bug causing ebb_worker crashes to bring down the whole system.

Paul Downen 13 anos atrás
pai
commit
9e29483a34

+ 1 - 1
src/ebb_func_fsm.erl

@@ -48,7 +48,7 @@ start_link(Func = #func{}, Mode, Receiver)
 %%--------------------------------------------------------------------
 init({Func, Mode, Receiver}) ->
     Call = fun(Code, Args, Return) ->
-		   ebb_worker_supv:start_link(Mode, Code, Args, Return)
+		   ebb_work_supv:start_work(Mode, Code, Args, Return)
 	   end,
     State = #state{func=Func, args=?DICT:new(), call=Call,
 		   receiver=Receiver},

+ 7 - 3
src/ebb_operation_fsm.erl

@@ -117,9 +117,13 @@ finished(cleanup, State) ->
 %% name as the current state name StateName is called to handle the event.
 %%--------------------------------------------------------------------
 
-running(return_out, From, State = #state{return_requests=Requests}) ->
-    State2 = State#state{return_requests=[From|Requests]},
-    {next_state, running, State2}.
+running(return_out, From,
+	State = #state{out_arity=Arity, return_requests=Requests}) ->
+    case Arity of
+	0 -> {reply, [], running, State};
+	_ -> State2 = State#state{return_requests=[From|Requests]},
+	     {next_state, running, State2}
+    end.
 
 finished(return_out, _From, State = #state{output=Output}) ->
     Reply = format_output(Output),

+ 55 - 8
src/ebb_work_manager.erl

@@ -3,9 +3,10 @@
 -behaviour(gen_server).
 
 %% API
--export([start_link/0, start_link/1]).
--export([start_work/3, work_finished/1,
-	 add_node/1, remove_node/1, scavenge/0,
+-export([start/0, start/1, start_link/0, start_link/1]).
+-export([start_work/1, start_work/3, work_finished/1, work_finished/2,
+	 available_node/0, add_node/1, remove_node/1, remove_node/2,
+	 scavenge/0, scavenge/1,
 	 get_node_queue/0, get_node_list/0]).
 
 %% gen_server callbacks
@@ -23,6 +24,12 @@
 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
 %% Description: Starts the server
 %%--------------------------------------------------------------------
+start() ->
+    start([node()]).
+
+start(Args) ->
+    ebb_work_supv:start_link(Args).
+
 start_link() ->
     start_link([node()]).
 
@@ -37,20 +44,35 @@ start_link(scavenge) ->
 start_link(Nodes) ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, Nodes, []).
 
+start_work(Node) ->
+    gen_server:cast(?SERVER, {start_work, Node}).
+
 start_work(Func, Args, Receiver) ->
     gen_server:call(?SERVER, {start_work, Func, Args, Receiver}).
 
 work_finished(Node) ->
     gen_server:cast(?SERVER, {work_finished, Node}).
 
+work_finished(Root, Node) ->
+    gen_server:cast({?SERVER, Root}, {work_finished, Node}).
+
+available_node() ->
+    gen_server:call(?SERVER, available_node).
+
 add_node(Node) ->
     gen_server:cast(?SERVER, {add_node, Node}).
 
 remove_node(Node) ->
     gen_server:cast(?SERVER, {remove_node, Node}).
 
+remove_node(Root, Node) ->
+    gen_server:cast({?SERVER, Root}, {remove_node, Node}).
+
 scavenge() ->
-    gen_server:cast(?SERVER, scavenge).
+    gen_server:cast(?SERVER, {scavenge, world}).
+
+scavenge(Domain) ->
+    gen_server:cast(?SERVER, {scavenge, Domain}).
 
 get_node_queue() ->
     gen_server:call(?SERVER, get_node_queue).
@@ -90,6 +112,10 @@ handle_call({start_work, Func, Args, Receiver}, _From,
     {Reply, Queue2} = start_work_from_queue(Func, Args, Receiver, Queue),
     State2 = State#state{node_queue=Queue2},
     {reply, Reply, State2};
+handle_call(available_node, _From, State = #state{node_queue=Queue}) ->
+    {_Priority, Node, Queue2} = find_available_node(Queue),
+    State2 = State#state{node_queue=Queue2},
+    {reply, Node, State2};
 handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) ->
     Reply = Queue,
     {reply, Reply, State}.
@@ -100,6 +126,10 @@ handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) ->
 %%                                      {stop, Reason, State}
 %% Description: Handling cast messages
 %%--------------------------------------------------------------------
+handle_cast({start_work, Node}, State = #state{node_queue=Queue}) ->
+    Queue2 = priority_queue:modify_priority(Node, fun(P) -> P+1 end, Queue),
+    State2 = State#state{node_queue=Queue2},
+    {noreply, State2};
 handle_cast({work_finished, Node}, State = #state{node_queue=Queue}) ->
     Queue2 = priority_queue:modify_priority(Node, fun(P) -> P-1 end, Queue),
     State2 = State#state{node_queue=Queue2},
@@ -112,8 +142,17 @@ handle_cast({remove_node, Node}, State = #state{node_queue=Queue}) ->
     Queue2 = priority_queue:delete_value(Node, Queue),
     State2 = State#state{node_queue=Queue2},
     {noreply, State2};
-handle_cast(scavenge, State = #state{node_queue=Queue}) ->
-    World = net_adm:world(),
+handle_cast({scavenge, Domain}, State = #state{node_queue=Queue}) ->
+    World = case Domain of
+		world ->
+		    net_adm:world();
+		local ->
+		    net_adm:world_list([list_to_atom(net_adm:localhost())]);
+		Hosts when is_list(Hosts) ->
+		    net_adm:world_list(Hosts);
+		Host ->
+		    net_adm:world_list([Host])
+	    end,
     Queue2 = new_nodes(World, Queue),
     State2 = State#state{node_queue=Queue2},
     {noreply, State2}.
@@ -148,9 +187,17 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%--------------------------------------------------------------------
 
-start_work_from_queue(Func, Args, Receiver, Queue) ->
+find_available_node(Queue) ->
     {Priority, Node, Queue2} = priority_queue:take_minimum(Queue),
-    Reply = ebb_worker_bridge:start_link(Node, Func, Args, Receiver),
+    case net_adm:ping(Node) of
+	pong -> {Priority, Node, Queue};
+	pang -> find_available_node(Queue2)
+    end.
+
+start_work_from_queue(Func, Args, Receiver, Queue) ->
+    {Priority, Node, Queue2} = find_available_node(Queue),
+    Reply = ebb_worker_bridge:start_link(
+	      Node, Func, Args, Receiver),
     case Reply of
 	{ok, _} -> 
 	    Queue3 = priority_queue:insert(Priority+1, Node, Queue2),

+ 54 - 0
src/ebb_work_supv.erl

@@ -0,0 +1,54 @@
+-module(ebb_work_supv).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/1, start_work/4]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%====================================================================
+%% API functions
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the supervisor
+%%--------------------------------------------------------------------
+start_link(Args) ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, Args).
+
+start_work(local, Func, Args, Receiver) ->
+    Worker = {make_ref(), {ebb_worker, start_link,
+			   [Func, Args, Receiver]},
+	      transient, brutal_kill, worker, [ebb_worker]},
+    supervisor:start_child(?SERVER, Worker);
+
+start_work(distributed, Func, Args, Receiver) ->
+    Worker = {make_ref(), {ebb_worker, start_link,
+			   [distributed, Func, Args, Receiver]},
+	      transient, brutal_kill, supervisor, [ebb_worker]},
+    supervisor:start_child(?SERVER, Worker).
+
+%%====================================================================
+%% Supervisor callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Func: init(Args) -> {ok,  {SupFlags,  [ChildSpec]}} |
+%%                     ignore                          |
+%%                     {error, Reason}
+%% Description: Whenever a supervisor is started using 
+%% supervisor:start_link/[2,3], this function is called by the new process 
+%% to find out about restart strategy, maximum restart frequency and child 
+%% specifications.
+%%--------------------------------------------------------------------
+init(Args) ->
+    Manager = {ebb_work_manager, {ebb_work_manager, start_link, [Args]},
+	      permanent, 2000, worker, [ebb_work_manager]},
+    {ok, {{one_for_one, 5, 5}, [Manager]}}.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================

+ 7 - 1
src/ebb_worker.erl

@@ -13,9 +13,15 @@ start_link(Func, Args, Receiver) ->
 	Error -> Error
     end.
 
+start_link(distributed, Func, Args, Receiver) ->
+    Node = ebb_work_manager:available_node(),
+    start_link(Node, Func, Args, Receiver);
 start_link(Node, Func, Args, Receiver) ->
     try
-	spawn_link(Node, ?MODULE, worker, [Node, Func, Args, Receiver])
+	Result = spawn_link(Node, ?MODULE, worker,
+			    [Node, Func, Args, Receiver]),
+	ebb_work_manager:start_work(Node),
+	Result
     of
 	Pid -> {ok, Pid}
     catch

+ 8 - 6
src/ebb_worker_bridge.erl

@@ -22,6 +22,10 @@
 start_link(Func, Args, Receiver) ->
     supervisor_bridge:start_link(?MODULE, {Func, Args, Receiver}).
 
+start_link(distributed, Func, Args, Receiver) ->
+    Node = ebb_work_manager:available_node(),
+    ebb_work_manager:start_work(Node),
+    start_link(Node, Func, Args, Receiver);
 start_link(Node, Func, Args, Receiver) ->
     supervisor_bridge:start_link(?MODULE, {Node, Func, Args, Receiver}).
 
@@ -58,13 +62,11 @@ init({Node, Func, Args, Receiver}) ->
 %% about to terminate. It should be the opposite of Module:init/1 and stop
 %% the subsystem and do any necessary cleaning up.The return value is ignored.
 %%--------------------------------------------------------------------
-terminate(normal, _State) ->
+terminate(_Reason, #state{mode=local}) ->
     ok;
-terminate(noconnection, #state{mode={distributed, Node}}) ->
-    ebb_work_manager:remove_node(Node),
-    exit(noconnection);
-terminate(Reason, _State) ->
-    exit(Reason).
+terminate(_Reason, #state{mode={distributed, Node}}) ->
+    ebb_work_manager:work_finished(Node),
+    ok.
 
 %%====================================================================
 %% Internal functions

+ 3 - 3
src/ebb_worker_supv.erl

@@ -32,13 +32,13 @@ start_link(Mode, Func, Args, Receiver) ->
 %%--------------------------------------------------------------------
 init({local, Func, Args, Receiver}) ->
     Worker = {'worker',
-	      {ebb_worker_bridge, start_link, [Func, Args, Receiver]},
-	      transient, 2000, supervisor, [ebb_worker_bridge]},
+	      {ebb_worker, start_link, [Func, Args, Receiver]},
+	      transient, 2000, worker, [ebb_worker]},
     {ok, {{one_for_one, 5, 1}, [Worker]}};
 init({distributed, Func, Args, Receiver}) ->
     Worker = {'worker',
 	      {ebb_work_manager, start_work, [Func, Args, Receiver]},
-	      transient, 2000, supervisor, [ebb_work_manager]},
+	      temporary, 2000, worker, [ebb_work_manager]},
     {ok, {{one_for_one, 5, 1}, [Worker]}}.
 
 %%====================================================================