123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- -module(ebb_work_manager).
- -behaviour(gen_server).
- %% API
- -export([start_link/0, start_link/1]).
- -export([start_work/3, work_finished/1,
- add_node/1, remove_node/1, scavenge/0,
- get_node_queue/0, get_node_list/0]).
- %% gen_server callbacks
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
- -define(SERVER, ebb_work_manager).
- -record(state, {node_queue}).
- %%====================================================================
- %% API
- %%====================================================================
- %%--------------------------------------------------------------------
- %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
- %% Description: Starts the server
- %%--------------------------------------------------------------------
- start_link() ->
- start_link([node()]).
- start_link(scavenge) ->
- case gen_server:start_link({local, ?SERVER}, ?MODULE, [node()], []) of
- {ok, Pid} ->
- scavenge(),
- {ok, Pid};
- Error ->
- Error
- end;
- start_link(Nodes) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, Nodes, []).
- start_work(Func, Args, Receiver) ->
- gen_server:call(?SERVER, {start_work, Func, Args, Receiver}).
- work_finished(Node) ->
- gen_server:cast(?SERVER, {work_finished, Node}).
- add_node(Node) ->
- gen_server:cast(?SERVER, {add_node, Node}).
- remove_node(Node) ->
- gen_server:cast(?SERVER, {remove_node, Node}).
- scavenge() ->
- gen_server:cast(?SERVER, scavenge).
- get_node_queue() ->
- gen_server:call(?SERVER, get_node_queue).
- get_node_list() ->
- priority_queue:to_list(get_node_queue()).
- %%====================================================================
- %% gen_server callbacks
- %%====================================================================
- %%--------------------------------------------------------------------
- %% Function: init(Args) -> {ok, State} |
- %% {ok, State, Timeout} |
- %% ignore |
- %% {stop, Reason}
- %% Description: Initiates the server
- %%--------------------------------------------------------------------
- init([]) ->
- Queue = priority_queue:empty(),
- {ok, #state{node_queue=Queue}};
- init(Nodes) ->
- Queue = priority_queue:from_orddict([{0, Nodes}]),
- {ok, #state{node_queue=Queue}}.
- %%--------------------------------------------------------------------
- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
- %% {reply, Reply, State, Timeout} |
- %% {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, Reply, State} |
- %% {stop, Reason, State}
- %% Description: Handling call messages
- %%--------------------------------------------------------------------
- handle_call({start_work, Func, Args, Receiver}, _From,
- State = #state{node_queue=Queue}) ->
- {Reply, Queue2} = start_work_from_queue(Func, Args, Receiver, Queue),
- State2 = State#state{node_queue=Queue2},
- {reply, Reply, State2};
- handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) ->
- Reply = Queue,
- {reply, Reply, State}.
- %%--------------------------------------------------------------------
- %% Function: handle_cast(Msg, State) -> {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, State}
- %% Description: Handling cast messages
- %%--------------------------------------------------------------------
- handle_cast({work_finished, Node}, State = #state{node_queue=Queue}) ->
- Queue2 = priority_queue:modify_priority(Node, fun(P) -> P-1 end, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2};
- handle_cast({add_node, Node}, State = #state{node_queue=Queue}) ->
- Queue2 = priority_queue:insert(0, Node, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2};
- handle_cast({remove_node, Node}, State = #state{node_queue=Queue}) ->
- Queue2 = priority_queue:delete_value(Node, Queue),
- State2 = State#state{node_queue=Queue2},
- {noreply, State2};
- handle_cast(scavenge, State = #state{node_queue=Queue}) ->
- % Scavenge nodes here
- Queue2 = Queue,
- State2 = State#state{node_queue=Queue2},
- {noreply, State2}.
- %%--------------------------------------------------------------------
- %% Function: handle_info(Info, State) -> {noreply, State} |
- %% {noreply, State, Timeout} |
- %% {stop, Reason, State}
- %% Description: Handling all non call/cast messages
- %%--------------------------------------------------------------------
- handle_info(_Info, State) ->
- {noreply, State}.
- %%--------------------------------------------------------------------
- %% Function: terminate(Reason, State) -> void()
- %% Description: This function is called by a gen_server when it is about to
- %% terminate. It should be the opposite of Module:init/1 and do any necessary
- %% cleaning up. When it returns, the gen_server terminates with Reason.
- %% The return value is ignored.
- %%--------------------------------------------------------------------
- terminate(_Reason, _State) ->
- ok.
- %%--------------------------------------------------------------------
- %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
- %% Description: Convert process state when code is changed
- %%--------------------------------------------------------------------
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %%--------------------------------------------------------------------
- %%% Internal functions
- %%--------------------------------------------------------------------
- start_work_from_queue(Func, Args, Receiver, Queue) ->
- {Priority, Node, Queue2} = priority_queue:take_minimum(Queue),
- Reply = ebb_worker_bridge:start_link(Node, Func, Args, Receiver),
- case Reply of
- {ok, _} ->
- Queue3 = priority_queue:insert(Priority+1, Node, Queue2),
- {Reply, Queue3};
- _Error ->
- start_work_from_queue(Func, Args, Receiver, Queue2)
- end.
|