-module(ebb_work_manager). -behaviour(gen_server). %% API -export([start/0, start/1, start_link/0, start_link/1]). -export([start_work/1, start_work/3, work_finished/1, work_finished/2, available_node/0, add_node/1, remove_node/1, remove_node/2, scavenge/0, scavenge/1, get_node_queue/0, get_node_list/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ebb_work_manager). -record(state, {node_queue}). %%==================================================================== %% API %%==================================================================== %%-------------------------------------------------------------------- %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- start() -> start([node()]). start(Args) -> ebb_work_supv:start_link(Args). start_link() -> start_link([node()]). start_link(scavenge) -> case gen_server:start_link({local, ?SERVER}, ?MODULE, [node()], []) of {ok, Pid} -> scavenge(), {ok, Pid}; Error -> Error end; start_link(Nodes) -> gen_server:start_link({local, ?SERVER}, ?MODULE, Nodes, []). start_work(Node) -> gen_server:cast(?SERVER, {start_work, Node}). start_work(Func, Args, Receiver) -> gen_server:call(?SERVER, {start_work, Func, Args, Receiver}). work_finished(Node) -> gen_server:cast(?SERVER, {work_finished, Node}). work_finished(Root, Node) -> gen_server:cast({?SERVER, Root}, {work_finished, Node}). available_node() -> gen_server:call(?SERVER, available_node). add_node(Node) -> gen_server:cast(?SERVER, {add_node, Node}). remove_node(Node) -> gen_server:cast(?SERVER, {remove_node, Node}). remove_node(Root, Node) -> gen_server:cast({?SERVER, Root}, {remove_node, Node}). scavenge() -> gen_server:cast(?SERVER, {scavenge, world}). scavenge(Domain) -> gen_server:cast(?SERVER, {scavenge, Domain}). get_node_queue() -> gen_server:call(?SERVER, get_node_queue). get_node_list() -> priority_queue:to_list(get_node_queue()). %%==================================================================== %% gen_server callbacks %%==================================================================== %%-------------------------------------------------------------------- %% Function: init(Args) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% Description: Initiates the server %%-------------------------------------------------------------------- init([]) -> Queue = priority_queue:empty(), {ok, #state{node_queue=Queue}}; init(Nodes) -> Queue = priority_queue:from_orddict([{0, Nodes}]), {ok, #state{node_queue=Queue}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- handle_call({start_work, Func, Args, Receiver}, _From, State = #state{node_queue=Queue}) -> {Reply, Queue2} = start_work_from_queue(Func, Args, Receiver, Queue), State2 = State#state{node_queue=Queue2}, {reply, Reply, State2}; handle_call(available_node, _From, State = #state{node_queue=Queue}) -> {_Priority, Node, Queue2} = find_available_node(Queue), State2 = State#state{node_queue=Queue2}, {reply, Node, State2}; handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) -> Reply = Queue, {reply, Reply, State}. %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% Description: Handling cast messages %%-------------------------------------------------------------------- handle_cast({start_work, Node}, State = #state{node_queue=Queue}) -> Queue2 = priority_queue:modify_priority(Node, fun(P) -> P+1 end, Queue), State2 = State#state{node_queue=Queue2}, {noreply, State2}; handle_cast({work_finished, Node}, State = #state{node_queue=Queue}) -> Queue2 = priority_queue:modify_priority(Node, fun(P) -> P-1 end, Queue), State2 = State#state{node_queue=Queue2}, {noreply, State2}; handle_cast({add_node, Node}, State = #state{node_queue=Queue}) -> Queue2 = new_node(Node, Queue), State2 = State#state{node_queue=Queue2}, {noreply, State2}; handle_cast({remove_node, Node}, State = #state{node_queue=Queue}) -> Queue2 = priority_queue:delete_value(Node, Queue), State2 = State#state{node_queue=Queue2}, {noreply, State2}; handle_cast({scavenge, Domain}, State = #state{node_queue=Queue}) -> World = case Domain of world -> net_adm:world(); local -> net_adm:world_list([list_to_atom(net_adm:localhost())]); Hosts when is_list(Hosts) -> net_adm:world_list(Hosts); Host -> net_adm:world_list([Host]) end, Queue2 = new_nodes(World, Queue), State2 = State#state{node_queue=Queue2}, {noreply, State2}. %%-------------------------------------------------------------------- %% Function: handle_info(Info, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- handle_info(_Info, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% Function: terminate(Reason, State) -> void() %% Description: This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any necessary %% cleaning up. When it returns, the gen_server terminates with Reason. %% The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, _State) -> ok. %%-------------------------------------------------------------------- %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} %% Description: Convert process state when code is changed %%-------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- find_available_node(Queue) -> {Priority, Node, Queue2} = priority_queue:take_minimum(Queue), case net_adm:ping(Node) of pong -> {Priority, Node, Queue}; pang -> find_available_node(Queue2) end. start_work_from_queue(Func, Args, Receiver, Queue) -> {Priority, Node, Queue2} = find_available_node(Queue), Reply = ebb_worker_bridge:start_link( Node, Func, Args, Receiver), case Reply of {ok, _} -> Queue3 = priority_queue:insert(Priority+1, Node, Queue2), {Reply, Queue3}; _Error -> start_work_from_queue(Func, Args, Receiver, Queue2) end. new_nodes(Nodes, Queue) -> lists:foldl(fun(Node, Q) -> new_node(Node, Q) end, Queue, Nodes). new_node(Node, Queue) -> case priority_queue:find_value(Node, Queue) of {_Priority, Node} -> Queue; none -> priority_queue:insert(0, Node, Queue) end.