ebb_work_manager.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. -module(ebb_work_manager).
  2. -behaviour(gen_server).
  3. %% API
  4. -export([start_link/0, start_link/1]).
  5. -export([start_work/3, work_finished/1,
  6. add_node/1, remove_node/1, scavenge/0,
  7. get_node_queue/0, get_node_list/0]).
  8. %% gen_server callbacks
  9. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  10. terminate/2, code_change/3]).
  11. -define(SERVER, ebb_work_manager).
  12. -record(state, {node_queue}).
  13. %%====================================================================
  14. %% API
  15. %%====================================================================
  16. %%--------------------------------------------------------------------
  17. %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
  18. %% Description: Starts the server
  19. %%--------------------------------------------------------------------
  20. start_link() ->
  21. start_link([node()]).
  22. start_link(scavenge) ->
  23. case gen_server:start_link({local, ?SERVER}, ?MODULE, [node()], []) of
  24. {ok, Pid} ->
  25. scavenge(),
  26. {ok, Pid};
  27. Error ->
  28. Error
  29. end;
  30. start_link(Nodes) ->
  31. gen_server:start_link({local, ?SERVER}, ?MODULE, Nodes, []).
  32. start_work(Func, Args, Receiver) ->
  33. gen_server:call(?SERVER, {start_work, Func, Args, Receiver}).
  34. work_finished(Node) ->
  35. gen_server:cast(?SERVER, {work_finished, Node}).
  36. add_node(Node) ->
  37. gen_server:cast(?SERVER, {add_node, Node}).
  38. remove_node(Node) ->
  39. gen_server:cast(?SERVER, {remove_node, Node}).
  40. scavenge() ->
  41. gen_server:cast(?SERVER, scavenge).
  42. get_node_queue() ->
  43. gen_server:call(?SERVER, get_node_queue).
  44. get_node_list() ->
  45. priority_queue:to_list(get_node_queue()).
  46. %%====================================================================
  47. %% gen_server callbacks
  48. %%====================================================================
  49. %%--------------------------------------------------------------------
  50. %% Function: init(Args) -> {ok, State} |
  51. %% {ok, State, Timeout} |
  52. %% ignore |
  53. %% {stop, Reason}
  54. %% Description: Initiates the server
  55. %%--------------------------------------------------------------------
  56. init([]) ->
  57. Queue = priority_queue:empty(),
  58. {ok, #state{node_queue=Queue}};
  59. init(Nodes) ->
  60. Queue = priority_queue:from_orddict([{0, Nodes}]),
  61. {ok, #state{node_queue=Queue}}.
  62. %%--------------------------------------------------------------------
  63. %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
  64. %% {reply, Reply, State, Timeout} |
  65. %% {noreply, State} |
  66. %% {noreply, State, Timeout} |
  67. %% {stop, Reason, Reply, State} |
  68. %% {stop, Reason, State}
  69. %% Description: Handling call messages
  70. %%--------------------------------------------------------------------
  71. handle_call({start_work, Func, Args, Receiver}, _From,
  72. State = #state{node_queue=Queue}) ->
  73. {Reply, Queue2} = start_work_from_queue(Func, Args, Receiver, Queue),
  74. State2 = State#state{node_queue=Queue2},
  75. {reply, Reply, State2};
  76. handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) ->
  77. Reply = Queue,
  78. {reply, Reply, State}.
  79. %%--------------------------------------------------------------------
  80. %% Function: handle_cast(Msg, State) -> {noreply, State} |
  81. %% {noreply, State, Timeout} |
  82. %% {stop, Reason, State}
  83. %% Description: Handling cast messages
  84. %%--------------------------------------------------------------------
  85. handle_cast({work_finished, Node}, State = #state{node_queue=Queue}) ->
  86. Queue2 = priority_queue:modify_priority(Node, fun(P) -> P-1 end, Queue),
  87. State2 = State#state{node_queue=Queue2},
  88. {noreply, State2};
  89. handle_cast({add_node, Node}, State = #state{node_queue=Queue}) ->
  90. Queue2 = priority_queue:insert(0, Node, Queue),
  91. State2 = State#state{node_queue=Queue2},
  92. {noreply, State2};
  93. handle_cast({remove_node, Node}, State = #state{node_queue=Queue}) ->
  94. Queue2 = priority_queue:delete_value(Node, Queue),
  95. State2 = State#state{node_queue=Queue2},
  96. {noreply, State2};
  97. handle_cast(scavenge, State = #state{node_queue=Queue}) ->
  98. % Scavenge nodes here
  99. Queue2 = Queue,
  100. State2 = State#state{node_queue=Queue2},
  101. {noreply, State2}.
  102. %%--------------------------------------------------------------------
  103. %% Function: handle_info(Info, State) -> {noreply, State} |
  104. %% {noreply, State, Timeout} |
  105. %% {stop, Reason, State}
  106. %% Description: Handling all non call/cast messages
  107. %%--------------------------------------------------------------------
  108. handle_info(_Info, State) ->
  109. {noreply, State}.
  110. %%--------------------------------------------------------------------
  111. %% Function: terminate(Reason, State) -> void()
  112. %% Description: This function is called by a gen_server when it is about to
  113. %% terminate. It should be the opposite of Module:init/1 and do any necessary
  114. %% cleaning up. When it returns, the gen_server terminates with Reason.
  115. %% The return value is ignored.
  116. %%--------------------------------------------------------------------
  117. terminate(_Reason, _State) ->
  118. ok.
  119. %%--------------------------------------------------------------------
  120. %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
  121. %% Description: Convert process state when code is changed
  122. %%--------------------------------------------------------------------
  123. code_change(_OldVsn, State, _Extra) ->
  124. {ok, State}.
  125. %%--------------------------------------------------------------------
  126. %%% Internal functions
  127. %%--------------------------------------------------------------------
  128. start_work_from_queue(Func, Args, Receiver, Queue) ->
  129. {Priority, Node, Queue2} = priority_queue:take_minimum(Queue),
  130. Reply = ebb_worker_bridge:start_link(Node, Func, Args, Receiver),
  131. case Reply of
  132. {ok, _} ->
  133. Queue3 = priority_queue:insert(Priority+1, Node, Queue2),
  134. {Reply, Queue3};
  135. _Error ->
  136. start_work_from_queue(Func, Args, Receiver, Queue2)
  137. end.