123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- -module(ebb_work_manager).
- -behaviour(gen_server).
- %% API
- -export([start/0, start/1, start_link/0, start_link/1]).
- -export([start_work/1, start_work/3, work_finished/1, work_finished/2,
- available_node/0, add_node/1, remove_node/1, remove_node/2,
- scavenge/0, scavenge/1,
- get_node_queue/0, get_node_list/0]).
- %% gen_server callbacks
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
- -define(SERVER, ebb_work_manager).
- -record(state, {node_queue}).
- %%====================================================================
- %% API
- %%====================================================================
- %%--------------------------------------------------------------------
- %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
- %% Description: Starts the server
- %%--------------------------------------------------------------------
- start() ->
- start([node()]).
- start(Args) ->
- ebb_work_supv:start_link(Args).
- start_link() ->
- start_link([node()]).
- start_link(scavenge) ->
- case gen_server:start_link({local, ?SERVER}, ?MODULE, [node()], []) of
- {ok, Pid} ->
- scavenge(),
- {ok, Pid};
- Error ->
- Error
- end;
- start_link(Nodes) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, Nodes, []).
- start_work(Node) ->
- gen_server:cast(?SERVER, {start_work, Node}).
- start_work(Func, Args, Receiver) ->
- gen_server:call(?SERVER, {start_work, Func, Args, Receiver}).
- work_finished(Node) ->
- gen_server:cast(?SERVER, {work_finished, Node}).
- work_finished(Root, Node) ->
- gen_server:cast({?SERVER, Root}, {work_finished, Node}).
- available_node() ->
- gen_server:call(?SERVER, available_node).
- add_node(Node) ->
- gen_server:cast(?SERVER, {add_node, Node}).
- remove_node(Node) ->
- gen_server:cast(?SERVER, {remove_node, Node}).
- remove_node(Root, Node) ->
- gen_server:cast({?SERVER, Root}, {remove_node, Node}).
- scavenge() ->
- gen_server:cast(?SERVER, {scavenge, world}).
- scavenge(Domain) ->
- gen_server:cast(?SERVER, {scavenge, Domain}).
- get_node_queue() ->
- gen_server:call(?SERVER, get_node_queue).
- get_node_list() ->
- priority_queue:to_list(get_node_queue()).
- %%====================================================================
- %% gen_server callbacks
- %%====================================================================
- %%--------------------------------------------------------------------
- %% Function: init(Args) -> {ok, State} |
- %% {ok, State, Timeout} |
- %% ignore |
- %% {stop, Reason}
- %% Description: Initiates the server
- %%--------------------------------------------------------------------
- init([]) ->
- Queue = priority_queue:empty(),
- {ok, #state{node_queue=Queue}};
- init(Nodes) ->
- Queue = priority_queue:from_orddict([{0, Nodes}]),
- {ok, #state{node_queue=Queue}}.
- %%--------------------------------------------------------------------
- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
- %% {reply, Reply, State, Timeout} |
- %% {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, Reply, State} |
- %% {stop, Reason, State}
- %% Description: Handling call messages
- %%--------------------------------------------------------------------
- handle_call({start_work, Func, Args, Receiver}, _From,
- State = #state{node_queue=Queue}) ->
- {Reply, Queue2} = start_work_from_queue(Func, Args, Receiver, Queue),
- State2 = State#state{node_queue=Queue2},
- {reply, Reply, State2};
- handle_call(available_node, _From, State = #state{node_queue=Queue}) ->
- {_Priority, Node, Queue2} = find_available_node(Queue),
- State2 = State#state{node_queue=Queue2},
- {reply, Node, State2};
- handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) ->
- Reply = Queue,
- {reply, Reply, State}.
- %%--------------------------------------------------------------------
- %% Function: handle_cast(Msg, State) -> {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, State}
- %% Description: Handling cast messages
- %%--------------------------------------------------------------------
- handle_cast({start_work, Node}, State = #state{node_queue=Queue}) ->
- Queue2 = priority_queue:modify_priority(Node, fun(P) -> P+1 end, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2};
- handle_cast({work_finished, Node}, State = #state{node_queue=Queue}) ->
- Queue2 = priority_queue:modify_priority(Node, fun(P) -> P-1 end, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2};
- handle_cast({add_node, Node}, State = #state{node_queue=Queue}) ->
- Queue2 = new_node(Node, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2};
- handle_cast({remove_node, Node}, State = #state{node_queue=Queue}) ->
- Queue2 = priority_queue:delete_value(Node, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2};
- handle_cast({scavenge, Domain}, State = #state{node_queue=Queue}) ->
- World = case Domain of
- world ->
- net_adm:world();
- local ->
- net_adm:world_list([list_to_atom(net_adm:localhost())]);
- Hosts when is_list(Hosts) ->
- net_adm:world_list(Hosts);
- Host ->
- net_adm:world_list([Host])
- end,
- Queue2 = new_nodes(World, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2}.
- %%--------------------------------------------------------------------
- %% Function: handle_info(Info, State) -> {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, State}
- %% Description: Handling all non call/cast messages
- %%--------------------------------------------------------------------
- handle_info(_Info, State) ->
- {noreply, State}.
- %%--------------------------------------------------------------------
- %% Function: terminate(Reason, State) -> void()
- %% Description: This function is called by a gen_server when it is about to
- %% terminate. It should be the opposite of Module:init/1 and do any necessary
- %% cleaning up. When it returns, the gen_server terminates with Reason.
- %% The return value is ignored.
- %%--------------------------------------------------------------------
- terminate(_Reason, _State) ->
- ok.
- %%--------------------------------------------------------------------
- %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
- %% Description: Convert process state when code is changed
- %%--------------------------------------------------------------------
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %%--------------------------------------------------------------------
- %%% Internal functions
- %%--------------------------------------------------------------------
- find_available_node(Queue) ->
- {Priority, Node, Queue2} = priority_queue:take_minimum(Queue),
- case net_adm:ping(Node) of
- pong -> {Priority, Node, Queue};
- pang -> find_available_node(Queue2)
- end.
- start_work_from_queue(Func, Args, Receiver, Queue) ->
- {Priority, Node, Queue2} = find_available_node(Queue),
- Reply = ebb_worker_bridge:start_link(
- Node, Func, Args, Receiver),
- case Reply of
- {ok, _} ->
- Queue3 = priority_queue:insert(Priority+1, Node, Queue2),
- {Reply, Queue3};
- _Error ->
- start_work_from_queue(Func, Args, Receiver, Queue2)
- end.
- new_nodes(Nodes, Queue) ->
- lists:foldl(fun(Node, Q) -> new_node(Node, Q) end,
- Queue, Nodes).
- new_node(Node, Queue) ->
- case priority_queue:find_value(Node, Queue) of
- {_Priority, Node} -> Queue;
- none -> priority_queue:insert(0, Node, Queue)
- end.
|