ebb_work_manager.erl 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. -module(ebb_work_manager).
  2. -behaviour(gen_server).
  3. %% API
  4. -export([start/0, start/1, start_link/0, start_link/1]).
  5. -export([start_work/1, start_work/3, work_finished/1, work_finished/2,
  6. available_node/0, add_node/1, remove_node/1, remove_node/2,
  7. scavenge/0, scavenge/1,
  8. get_node_queue/0, get_node_list/0]).
  9. %% gen_server callbacks
  10. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  11. terminate/2, code_change/3]).
  12. -define(SERVER, ebb_work_manager).
  13. -record(state, {node_queue}).
  14. %%====================================================================
  15. %% API
  16. %%====================================================================
  17. %%--------------------------------------------------------------------
  18. %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
  19. %% Description: Starts the server
  20. %%--------------------------------------------------------------------
  21. start() ->
  22. start([node()]).
  23. start(Args) ->
  24. ebb_work_supv:start_link(Args).
  25. start_link() ->
  26. start_link([node()]).
  27. start_link(scavenge) ->
  28. case gen_server:start_link({local, ?SERVER}, ?MODULE, [node()], []) of
  29. {ok, Pid} ->
  30. scavenge(),
  31. {ok, Pid};
  32. Error ->
  33. Error
  34. end;
  35. start_link(Nodes) ->
  36. gen_server:start_link({local, ?SERVER}, ?MODULE, Nodes, []).
  37. start_work(Node) ->
  38. gen_server:cast(?SERVER, {start_work, Node}).
  39. start_work(Func, Args, Receiver) ->
  40. gen_server:call(?SERVER, {start_work, Func, Args, Receiver}).
  41. work_finished(Node) ->
  42. gen_server:cast(?SERVER, {work_finished, Node}).
  43. work_finished(Root, Node) ->
  44. gen_server:cast({?SERVER, Root}, {work_finished, Node}).
  45. available_node() ->
  46. gen_server:call(?SERVER, available_node).
  47. add_node(Node) ->
  48. gen_server:cast(?SERVER, {add_node, Node}).
  49. remove_node(Node) ->
  50. gen_server:cast(?SERVER, {remove_node, Node}).
  51. remove_node(Root, Node) ->
  52. gen_server:cast({?SERVER, Root}, {remove_node, Node}).
  53. scavenge() ->
  54. gen_server:cast(?SERVER, {scavenge, world}).
  55. scavenge(Domain) ->
  56. gen_server:cast(?SERVER, {scavenge, Domain}).
  57. get_node_queue() ->
  58. gen_server:call(?SERVER, get_node_queue).
  59. get_node_list() ->
  60. priority_queue:to_list(get_node_queue()).
  61. %%====================================================================
  62. %% gen_server callbacks
  63. %%====================================================================
  64. %%--------------------------------------------------------------------
  65. %% Function: init(Args) -> {ok, State} |
  66. %% {ok, State, Timeout} |
  67. %% ignore |
  68. %% {stop, Reason}
  69. %% Description: Initiates the server
  70. %%--------------------------------------------------------------------
  71. init([]) ->
  72. Queue = priority_queue:empty(),
  73. {ok, #state{node_queue=Queue}};
  74. init(Nodes) ->
  75. Queue = priority_queue:from_orddict([{0, Nodes}]),
  76. {ok, #state{node_queue=Queue}}.
  77. %%--------------------------------------------------------------------
  78. %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
  79. %% {reply, Reply, State, Timeout} |
  80. %% {noreply, State} |
  81. %% {noreply, State, Timeout} |
  82. %% {stop, Reason, Reply, State} |
  83. %% {stop, Reason, State}
  84. %% Description: Handling call messages
  85. %%--------------------------------------------------------------------
  86. handle_call({start_work, Func, Args, Receiver}, _From,
  87. State = #state{node_queue=Queue}) ->
  88. {Reply, Queue2} = start_work_from_queue(Func, Args, Receiver, Queue),
  89. State2 = State#state{node_queue=Queue2},
  90. {reply, Reply, State2};
  91. handle_call(available_node, _From, State = #state{node_queue=Queue}) ->
  92. {_Priority, Node, Queue2} = find_available_node(Queue),
  93. State2 = State#state{node_queue=Queue2},
  94. {reply, Node, State2};
  95. handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) ->
  96. Reply = Queue,
  97. {reply, Reply, State}.
  98. %%--------------------------------------------------------------------
  99. %% Function: handle_cast(Msg, State) -> {noreply, State} |
  100. %% {noreply, State, Timeout} |
  101. %% {stop, Reason, State}
  102. %% Description: Handling cast messages
  103. %%--------------------------------------------------------------------
  104. handle_cast({start_work, Node}, State = #state{node_queue=Queue}) ->
  105. Queue2 = priority_queue:modify_priority(Node, fun(P) -> P+1 end, Queue),
  106. State2 = State#state{node_queue=Queue2},
  107. {noreply, State2};
  108. handle_cast({work_finished, Node}, State = #state{node_queue=Queue}) ->
  109. Queue2 = priority_queue:modify_priority(Node, fun(P) -> P-1 end, Queue),
  110. State2 = State#state{node_queue=Queue2},
  111. {noreply, State2};
  112. handle_cast({add_node, Node}, State = #state{node_queue=Queue}) ->
  113. Queue2 = new_node(Node, Queue),
  114. State2 = State#state{node_queue=Queue2},
  115. {noreply, State2};
  116. handle_cast({remove_node, Node}, State = #state{node_queue=Queue}) ->
  117. Queue2 = priority_queue:delete_value(Node, Queue),
  118. State2 = State#state{node_queue=Queue2},
  119. {noreply, State2};
  120. handle_cast({scavenge, Domain}, State = #state{node_queue=Queue}) ->
  121. World = case Domain of
  122. world ->
  123. net_adm:world();
  124. local ->
  125. net_adm:world_list([list_to_atom(net_adm:localhost())]);
  126. Hosts when is_list(Hosts) ->
  127. net_adm:world_list(Hosts);
  128. Host ->
  129. net_adm:world_list([Host])
  130. end,
  131. Queue2 = new_nodes(World, Queue),
  132. State2 = State#state{node_queue=Queue2},
  133. {noreply, State2}.
  134. %%--------------------------------------------------------------------
  135. %% Function: handle_info(Info, State) -> {noreply, State} |
  136. %% {noreply, State, Timeout} |
  137. %% {stop, Reason, State}
  138. %% Description: Handling all non call/cast messages
  139. %%--------------------------------------------------------------------
  140. handle_info(_Info, State) ->
  141. {noreply, State}.
  142. %%--------------------------------------------------------------------
  143. %% Function: terminate(Reason, State) -> void()
  144. %% Description: This function is called by a gen_server when it is about to
  145. %% terminate. It should be the opposite of Module:init/1 and do any necessary
  146. %% cleaning up. When it returns, the gen_server terminates with Reason.
  147. %% The return value is ignored.
  148. %%--------------------------------------------------------------------
  149. terminate(_Reason, _State) ->
  150. ok.
  151. %%--------------------------------------------------------------------
  152. %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
  153. %% Description: Convert process state when code is changed
  154. %%--------------------------------------------------------------------
  155. code_change(_OldVsn, State, _Extra) ->
  156. {ok, State}.
  157. %%--------------------------------------------------------------------
  158. %%% Internal functions
  159. %%--------------------------------------------------------------------
  160. find_available_node(Queue) ->
  161. {Priority, Node, Queue2} = priority_queue:take_minimum(Queue),
  162. case net_adm:ping(Node) of
  163. pong -> {Priority, Node, Queue};
  164. pang -> find_available_node(Queue2)
  165. end.
  166. start_work_from_queue(Func, Args, Receiver, Queue) ->
  167. {Priority, Node, Queue2} = find_available_node(Queue),
  168. Reply = ebb_worker_bridge:start_link(
  169. Node, Func, Args, Receiver),
  170. case Reply of
  171. {ok, _} ->
  172. Queue3 = priority_queue:insert(Priority+1, Node, Queue2),
  173. {Reply, Queue3};
  174. _Error ->
  175. start_work_from_queue(Func, Args, Receiver, Queue2)
  176. end.
  177. new_nodes(Nodes, Queue) ->
  178. lists:foldl(fun(Node, Q) -> new_node(Node, Q) end,
  179. Queue, Nodes).
  180. new_node(Node, Queue) ->
  181. case priority_queue:find_value(Node, Queue) of
  182. {_Priority, Node} -> Queue;
  183. none -> priority_queue:insert(0, Node, Queue)
  184. end.