ebb_operation_fsm.erl 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. -module(ebb_operation_fsm).
  2. -behaviour(gen_fsm).
  3. -include("../include/ebb_prim.hrl").
  4. %% API
  5. -export([start_link/1, start_link/2]).
  6. -export([send_in/2, send_in/3, return_out/1, cleanup/1]).
  7. %% gen_fsm callbacks
  8. -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
  9. terminate/3, code_change/4]).
  10. %% state callbacks
  11. -export([running/2, running/3, finished/2, finished/3]).
  12. -define(DICT, orddict).
  13. -record(state, {mode, operation, out_arity, output, return_requests}).
  14. %%====================================================================
  15. %% API
  16. %%====================================================================
  17. %%--------------------------------------------------------------------
  18. %% Function: start_link() -> ok,Pid} | ignore | {error,Error}
  19. %% Description:Creates a gen_fsm process which calls Module:init/1 to
  20. %% initialize. To ensure a synchronized start-up procedure, this function
  21. %% does not return until Module:init/1 has returned.
  22. %%--------------------------------------------------------------------
  23. start_link(Op) ->
  24. start_link(Op, local).
  25. start_link(Op, Mode) ->
  26. gen_fsm:start_link(?MODULE, {Op, Mode}, []).
  27. send_in(Op, N, Arg) ->
  28. ebb_event:in(Op, N, Arg).
  29. send_in(Op, Args) ->
  30. lists:foreach(fun({N, Arg}) -> send_in(Op, N, Arg) end,
  31. lists:zip(lists:seq(1, length(Args)), Args)).
  32. return_out(Op) ->
  33. gen_fsm:sync_send_event(Op, return_out, infinity).
  34. cleanup(Op) ->
  35. gen_fsm:send_event(Op, cleanup).
  36. %%====================================================================
  37. %% gen_fsm callbacks
  38. %%====================================================================
  39. %%--------------------------------------------------------------------
  40. %% Function: init(Args) -> {ok, StateName, State} |
  41. %% {ok, StateName, State, Timeout} |
  42. %% ignore |
  43. %% {stop, StopReason}
  44. %% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
  45. %% gen_fsm:start_link/3,4, this function is called by the new process to
  46. %% initialize.
  47. %%--------------------------------------------------------------------
  48. init({Op, Mode}) ->
  49. State = #state{mode=Mode, out_arity=ebb_prim:out_arity(Op),
  50. output=?DICT:new(), return_requests=[]},
  51. case start_operation(Op, Mode, self()) of
  52. {ok, Pid} -> {ok, running, State#state{operation=Pid}};
  53. ignore -> {ok, running, State};
  54. Error -> Error
  55. end.
  56. %%--------------------------------------------------------------------
  57. %% Function:
  58. %% state_name(Event, State) -> {next_state, NextStateName, NextState}|
  59. %% {next_state, NextStateName,
  60. %% NextState, Timeout} |
  61. %% {stop, Reason, NewState}
  62. %% Description:There should be one instance of this function for each possible
  63. %% state name. Whenever a gen_fsm receives an event sent using
  64. %% gen_fsm:send_event/2, the instance of this function with the same name as
  65. %% the current state name StateName is called to handle the event. It is also
  66. %% called if a timeout occurs.
  67. %%--------------------------------------------------------------------
  68. running({out, N, Val},
  69. State = #state{out_arity=Arity, output=Output,
  70. return_requests=Requests})
  71. when N > 0, N =< Arity ->
  72. Output2 = ?DICT:store(N, Val, Output),
  73. State2 = State#state{output=Output2},
  74. case is_done(State2) of
  75. true -> send_out_requests(Requests, Output2),
  76. {next_state, finished, State2#state{return_requests=[]}};
  77. false -> {next_state, running, State2}
  78. end;
  79. running({in, N, Arg}, State = #state{operation=Pid}) ->
  80. ebb_event:in(Pid, N, Arg),
  81. {next_state, running, State};
  82. running(cleanup, State) ->
  83. {next_state, running, State}.
  84. finished(cleanup, State) ->
  85. {stop, normal, State}.
  86. %%--------------------------------------------------------------------
  87. %% Function:
  88. %% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
  89. %% {next_state, NextStateName,
  90. %% NextState, Timeout} |
  91. %% {reply, Reply, NextStateName, NextState}|
  92. %% {reply, Reply, NextStateName,
  93. %% NextState, Timeout} |
  94. %% {stop, Reason, NewState}|
  95. %% {stop, Reason, Reply, NewState}
  96. %% Description: There should be one instance of this function for each
  97. %% possible state name. Whenever a gen_fsm receives an event sent using
  98. %% gen_fsm:sync_send_event/2,3, the instance of this function with the same
  99. %% name as the current state name StateName is called to handle the event.
  100. %%--------------------------------------------------------------------
  101. running(return_out, From, State = #state{return_requests=Requests}) ->
  102. State2 = State#state{return_requests=[From|Requests]},
  103. {next_state, running, State2}.
  104. finished(return_out, _From, State = #state{output=Output}) ->
  105. Reply = format_output(Output),
  106. {reply, Reply, finished, State}.
  107. %%--------------------------------------------------------------------
  108. %% Function:
  109. %% handle_event(Event, StateName, State) -> {next_state, NextStateName,
  110. %% NextState} |
  111. %% {next_state, NextStateName,
  112. %% NextState, Timeout} |
  113. %% {stop, Reason, NewState}
  114. %% Description: Whenever a gen_fsm receives an event sent using
  115. %% gen_fsm:send_all_state_event/2, this function is called to handle
  116. %% the event.
  117. %%--------------------------------------------------------------------
  118. handle_event(_Event, StateName, State) ->
  119. {next_state, StateName, State}.
  120. %%--------------------------------------------------------------------
  121. %% Function:
  122. %% handle_sync_event(Event, From, StateName,
  123. %% State) -> {next_state, NextStateName, NextState} |
  124. %% {next_state, NextStateName, NextState,
  125. %% Timeout} |
  126. %% {reply, Reply, NextStateName, NextState}|
  127. %% {reply, Reply, NextStateName, NextState,
  128. %% Timeout} |
  129. %% {stop, Reason, NewState} |
  130. %% {stop, Reason, Reply, NewState}
  131. %% Description: Whenever a gen_fsm receives an event sent using
  132. %% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
  133. %% the event.
  134. %%--------------------------------------------------------------------
  135. handle_sync_event(_Event, _From, StateName, State) ->
  136. Reply = ok,
  137. {reply, Reply, StateName, State}.
  138. %%--------------------------------------------------------------------
  139. %% Function:
  140. %% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
  141. %% {next_state, NextStateName, NextState,
  142. %% Timeout} |
  143. %% {stop, Reason, NewState}
  144. %% Description: This function is called by a gen_fsm when it receives any
  145. %% other message than a synchronous or asynchronous event
  146. %% (or a system message).
  147. %%--------------------------------------------------------------------
  148. handle_info(_Info, StateName, State) ->
  149. {next_state, StateName, State}.
  150. %%--------------------------------------------------------------------
  151. %% Function: terminate(Reason, StateName, State) -> void()
  152. %% Description:This function is called by a gen_fsm when it is about
  153. %% to terminate. It should be the opposite of Module:init/1 and do any
  154. %% necessary cleaning up. When it returns, the gen_fsm terminates with
  155. %% Reason. The return value is ignored.
  156. %%--------------------------------------------------------------------
  157. terminate(_Reason, _StateName, _State) ->
  158. ok.
  159. %%--------------------------------------------------------------------
  160. %% Function:
  161. %% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
  162. %% Description: Convert process state when code is changed
  163. %%--------------------------------------------------------------------
  164. code_change(_OldVsn, StateName, State, _Extra) ->
  165. {ok, StateName, State}.
  166. %%--------------------------------------------------------------------
  167. %%% Internal functions
  168. %%--------------------------------------------------------------------
  169. start_operation(Func = #func{}, Mode, Receiver) ->
  170. ebb_func_fsm:start_link(Func, Mode, Receiver);
  171. start_operation(Value = #value{}, _Mode, Receiver) ->
  172. ebb_value_fsm:start_link(Value, Receiver);
  173. start_operation(Pipe = #pipe{}, Mode, Receiver) ->
  174. ebb_pipe_fsm:start_link(
  175. Pipe, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver);
  176. start_operation(Par = #par{}, Mode, Receiver) ->
  177. ebb_par_fsm:start_link(
  178. Par, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver);
  179. start_operation(Route = #route{}, _Mode, Receiver) ->
  180. ebb_route_fsm:start_link(Route, Receiver);
  181. start_operation(Sync = #sync{}, _Mode, Receiver) ->
  182. ebb_sync_fsm:start_link(Sync, Receiver);
  183. start_operation(Split = #split{}, _Mode, Receiver) ->
  184. ebb_split_fsm:start_link(Split, Receiver);
  185. start_operation(Merge = #merge{}, _Mode, Receiver) ->
  186. ebb_merge_fsm:start_link(Merge, Receiver);
  187. start_operation(Switch = #switch{}, Mode, Receiver) ->
  188. ebb_switch_fsm:start_link(
  189. Switch, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver).
  190. is_done(#state{out_arity=Arity, output=Output}) ->
  191. ?DICT:size(Output) == Arity.
  192. format_output(Output) ->
  193. [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Output)) ].
  194. send_out_requests(Requests, Output) ->
  195. Out = format_output(Output),
  196. lists:foreach(fun(From) -> gen_fsm:reply(From, Out) end, Requests).