-module(ebb_operation_fsm). -behaviour(gen_fsm). -include("../include/ebb_prim.hrl"). %% API -export([start_link/1, start_link/2]). -export([send_in/2, send_in/3, return_out/1, cleanup/1]). %% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). %% state callbacks -export([running/2, running/3, finished/2, finished/3]). -define(DICT, orddict). -record(state, {mode, operation, out_arity, output, return_requests, pending_cleanup}). %%==================================================================== %% API %%==================================================================== %%-------------------------------------------------------------------- %% Function: start_link() -> ok,Pid} | ignore | {error,Error} %% Description:Creates a gen_fsm process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this function %% does not return until Module:init/1 has returned. %%-------------------------------------------------------------------- start_link(Op) -> start_link(Op, local). start_link(Op, Mode) -> gen_fsm:start_link(?MODULE, {Op, Mode}, []). send_in(Op, N, Arg) -> ebb_event:in(Op, N, Arg). send_in(Op, Args) -> lists:foreach(fun({N, Arg}) -> send_in(Op, N, Arg) end, lists:zip(lists:seq(1, length(Args)), Args)). return_out(Op) -> gen_fsm:sync_send_event(Op, return_out, infinity). cleanup(Op) -> gen_fsm:send_event(Op, cleanup). %%==================================================================== %% gen_fsm callbacks %%==================================================================== %%-------------------------------------------------------------------- %% Function: init(Args) -> {ok, StateName, State} | %% {ok, StateName, State, Timeout} | %% ignore | %% {stop, StopReason} %% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or %% gen_fsm:start_link/3,4, this function is called by the new process to %% initialize. %%-------------------------------------------------------------------- init({Op, Mode}) -> State = #state{mode=Mode, out_arity=ebb_prim:out_arity(Op), output=?DICT:new(), return_requests=[], pending_cleanup=false}, case start_operation(Op, Mode, self()) of {ok, Pid} -> {ok, running, State#state{operation=Pid}}; ignore -> {ok, running, State}; Error -> Error end. %%-------------------------------------------------------------------- %% Function: %% state_name(Event, State) -> {next_state, NextStateName, NextState}| %% {next_state, NextStateName, %% NextState, Timeout} | %% {stop, Reason, NewState} %% Description:There should be one instance of this function for each possible %% state name. Whenever a gen_fsm receives an event sent using %% gen_fsm:send_event/2, the instance of this function with the same name as %% the current state name StateName is called to handle the event. It is also %% called if a timeout occurs. %%-------------------------------------------------------------------- running({out, N, Val}, State = #state{out_arity=Arity, output=Output, return_requests=Requests, pending_cleanup=Clean}) when N > 0, N =< Arity -> Output2 = ?DICT:store(N, Val, Output), State2 = State#state{output=Output2}, case is_done(State2) of true -> send_out_requests(Requests, Output2), State3 = State2#state{return_requests=[]}, case Clean of true -> {stop, normal, State3}; false -> {next_state, finished, State3} end; false -> {next_state, running, State2} end; running({in, N, Arg}, State = #state{operation=Pid}) -> ebb_event:in(Pid, N, Arg), {next_state, running, State}; running(cleanup, State) -> State2 = State#state{pending_cleanup=true}, {next_state, running, State2}. finished(cleanup, State) -> {stop, normal, State}. %%-------------------------------------------------------------------- %% Function: %% state_name(Event, From, State) -> {next_state, NextStateName, NextState} | %% {next_state, NextStateName, %% NextState, Timeout} | %% {reply, Reply, NextStateName, NextState}| %% {reply, Reply, NextStateName, %% NextState, Timeout} | %% {stop, Reason, NewState}| %% {stop, Reason, Reply, NewState} %% Description: There should be one instance of this function for each %% possible state name. Whenever a gen_fsm receives an event sent using %% gen_fsm:sync_send_event/2,3, the instance of this function with the same %% name as the current state name StateName is called to handle the event. %%-------------------------------------------------------------------- running(return_out, From, State = #state{out_arity=Arity, return_requests=Requests}) -> case Arity of 0 -> {reply, [], running, State}; _ -> State2 = State#state{return_requests=[From|Requests]}, {next_state, running, State2} end. finished(return_out, _From, State = #state{output=Output}) -> Reply = format_output(Output), {reply, Reply, finished, State}. %%-------------------------------------------------------------------- %% Function: %% handle_event(Event, StateName, State) -> {next_state, NextStateName, %% NextState} | %% {next_state, NextStateName, %% NextState, Timeout} | %% {stop, Reason, NewState} %% Description: Whenever a gen_fsm receives an event sent using %% gen_fsm:send_all_state_event/2, this function is called to handle %% the event. %%-------------------------------------------------------------------- handle_event(_Event, StateName, State) -> {next_state, StateName, State}. %%-------------------------------------------------------------------- %% Function: %% handle_sync_event(Event, From, StateName, %% State) -> {next_state, NextStateName, NextState} | %% {next_state, NextStateName, NextState, %% Timeout} | %% {reply, Reply, NextStateName, NextState}| %% {reply, Reply, NextStateName, NextState, %% Timeout} | %% {stop, Reason, NewState} | %% {stop, Reason, Reply, NewState} %% Description: Whenever a gen_fsm receives an event sent using %% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle %% the event. %%-------------------------------------------------------------------- handle_sync_event(_Event, _From, StateName, State) -> Reply = ok, {reply, Reply, StateName, State}. %%-------------------------------------------------------------------- %% Function: %% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}| %% {next_state, NextStateName, NextState, %% Timeout} | %% {stop, Reason, NewState} %% Description: This function is called by a gen_fsm when it receives any %% other message than a synchronous or asynchronous event %% (or a system message). %%-------------------------------------------------------------------- handle_info(_Info, StateName, State) -> {next_state, StateName, State}. %%-------------------------------------------------------------------- %% Function: terminate(Reason, StateName, State) -> void() %% Description:This function is called by a gen_fsm when it is about %% to terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_fsm terminates with %% Reason. The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, _StateName, _State) -> ok. %%-------------------------------------------------------------------- %% Function: %% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState} %% Description: Convert process state when code is changed %%-------------------------------------------------------------------- code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- start_operation(Func = #func{}, Mode, Receiver) -> ebb_func_fsm:start_link(Func, Mode, Receiver); start_operation(Value = #value{}, _Mode, Receiver) -> ebb_value_fsm:start_link(Value, Receiver); start_operation(Pipe = #pipe{}, Mode, Receiver) -> ebb_pipe_fsm:start_link( Pipe, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver); start_operation(Par = #par{}, Mode, Receiver) -> ebb_par_fsm:start_link( Par, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver); start_operation(Route = #route{}, _Mode, Receiver) -> ebb_route_fsm:start_link(Route, Receiver); start_operation(Sync = #sync{}, _Mode, Receiver) -> ebb_sync_fsm:start_link(Sync, Receiver); start_operation(Split = #split{}, _Mode, Receiver) -> ebb_split_fsm:start_link(Split, Receiver); start_operation(Merge = #merge{}, _Mode, Receiver) -> ebb_merge_fsm:start_link(Merge, Receiver); start_operation(Switch = #switch{}, Mode, Receiver) -> ebb_switch_fsm:start_link( Switch, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver). is_done(#state{out_arity=Arity, output=Output}) -> ?DICT:size(Output) == Arity. format_output(Output) -> [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Output)) ]. send_out_requests(Requests, Output) -> Out = format_output(Output), lists:foreach(fun(From) -> gen_fsm:reply(From, Out) end, Requests).