Explorar el Código

Added FSM processes to support concurrent operation computation.

Paul Downen hace 13 años
padre
commit
d5fd4bb474

+ 157 - 0
src/ebb_chain_fsm.erl

@@ -0,0 +1,157 @@
+-module(ebb_chain_fsm).
+
+-behaviour(gen_fsm).
+
+%% API
+-export([start_link/2]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([forwarding/2]).
+
+-define(DICT, orddict).
+
+-record(state, {arity, vals, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Arity, Receiver) when is_number(Arity), is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Arity, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({Arity, Receiver}) ->
+    State = #state{arity=Arity, vals=?DICT:new(), receiver=Receiver},
+    {ok, forwarding, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+forwarding({out, N, Val},
+	   State = #state{arity=Arity, vals=Vals, receiver=Receiver})
+  when N > 0, N =< Arity ->
+    Vals2 = ?DICT:store(N, Val, Vals),
+    State2 = State#state{vals=Vals2},
+    ebb_event:in(Receiver, N, Val),
+    case is_done(State2) of
+	true  -> {stop, normal, State2};
+	false -> {next_state, forwarding, State2}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+is_done(#state{arity=Arity, vals=Vals}) ->
+  ?DICT:size(Vals) == Arity.

+ 9 - 0
src/ebb_event.erl

@@ -0,0 +1,9 @@
+-module(ebb_event).
+
+-export([in/3, out/3]).
+
+in(Receiver, N, Val) ->
+    gen_fsm:send_event(Receiver, {in, N, Val}).
+
+out(Receiver, N, Val) ->
+    gen_fsm:send_event(Receiver, {out, N, Val}).

+ 181 - 0
src/ebb_func_fsm.erl

@@ -0,0 +1,181 @@
+-module(ebb_func_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/2, start_link/3]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([waiting/2, running/2]).
+
+-define(DICT, orddict).
+
+-record(state, {func, args, call, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Func = #func{}, Receiver) ->
+    Call = fun(F, Args, Return) ->
+		   spawn(fun() -> return(Return, apply(F, Args)) end)
+	   end,
+    start_link(Func, Call, Receiver).
+
+start_link(Func = #func{}, Call, Receiver)
+  when is_function(Call), is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Func, Call, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({Func, Call, Receiver}) ->
+    State = #state{func=Func, args=?DICT:new(), call=Call,
+		   receiver=Receiver},
+    case Func of
+	#func{in=0,code=Code} ->
+	    Call(Code, [], self()),
+	    {ok, running, State};
+	_ -> {ok, waiting, State}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+waiting({in, N, Arg},
+	State = #state{func=Func = #func{in=In}, args=Args, call=Call})
+  when N > 0, N =< In ->
+    Args2 = ?DICT:store(N, Arg, Args),
+    State2 = State#state{args=Args2},
+    case is_ready(State2) of
+	true  -> VArgs =
+		     [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Args2)) ],
+		 Call(Func#func.code, VArgs, self()),
+		 {next_state, running, State2};
+	false -> {next_state, waiting, State2}
+    end.
+
+running({return, Value}, State = #state{receiver=Receiver}) ->
+    ebb_event:out(Receiver, 1, Value),
+    {stop, normal, State}.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+is_ready(#state{func=#func{in=In}, args=Args}) ->
+    ?DICT:size(Args) == In.
+
+return(Receiver, Val) ->
+    gen_fsm:send_event(Receiver, {return, Val}).

+ 160 - 0
src/ebb_merge_fsm.erl

@@ -0,0 +1,160 @@
+-module(ebb_merge_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/2]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([waiting/2]).
+
+-define(DICT, orddict).
+
+-record(state, {size, vals, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Merge = #merge{}, Receiver) when is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Merge, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({#merge{size=Size}, Receiver}) ->
+    State = #state{size=Size, vals=?DICT:new(), receiver=Receiver},
+    {ok, waiting, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+waiting({in, N, Val},
+	State = #state{size=Size, vals=Vals, receiver=Receiver})
+  when N > 0, N =< Size ->
+    Vals2 = ?DICT:store(N, Val, Vals),
+    State2 = State#state{vals=Vals2},
+    case is_done(State2) of
+	true  -> Merged =
+		     [ X || {_, X} <- lists:keysort(1, ?DICT:to_list(Vals)) ],
+		 ebb_event:out(Receiver, 1, Merged),
+		 {stop, normal, State2};
+	false -> {next_state, waiting, State2}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+is_done(#state{size=Size, vals=Vals}) ->
+    ?DICT:size(Vals) == Size.

+ 158 - 0
src/ebb_offset_fsm.erl

@@ -0,0 +1,158 @@
+-module(ebb_offset_fsm).
+
+-behaviour(gen_fsm).
+
+%% API
+-export([start_link/3]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([forwarding/2]).
+
+-define(DICT, orddict).
+
+-record(state, {arity, vals, offset, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Arity, Offset, Receiver)
+  when is_number(Arity), is_number(Offset), is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Arity, Offset, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({Arity, Offset, Receiver}) ->
+    {ok, forwarding, #state{arity=Arity, vals=?DICT:new(),
+			    offset=Offset, receiver=Receiver}}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+forwarding({out, N, Val},
+	   State = #state{arity=Arity, vals=Vals,
+			  offset=Offset, receiver=Receiver}) 
+  when N > 0, N =< Arity ->
+    Vals2 = ?DICT:store(N, Val, Vals),
+    State2 = State#state{vals=Vals2},
+    ebb_event:out(Receiver, N+Offset, Val),
+    case is_done(State2) of
+	true  -> {stop, normal, State2};
+	false -> {next_state, forwarding, State2}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+is_done(#state{arity=Arity, vals=Vals}) ->
+    ?DICT:size(Vals) == Arity.

+ 186 - 0
src/ebb_par_fsm.erl

@@ -0,0 +1,186 @@
+-module(ebb_par_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/3]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+-export([forwarding/2]).
+
+-define(DICT, orddict).
+
+-record(state, {par, in_map, input}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Par = #par{}, Run, Receiver) when is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Par, Run, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({Par, Run, Receiver}) ->
+    Fsms = run_ops(Par, Run, Receiver),
+    State = #state{par=Par, in_map=build_in_map(Par, Fsms), input=?DICT:new()},
+    {ok, forwarding, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+forwarding({in, N, Val},
+	   State = #state{par=#par{in=In}, input=Input, in_map=Map})
+  when N > 0, N =< In ->
+    State2 = State#state{input=?DICT:store(N, Val, Input)},
+    {N2, Receiver} = ?DICT:fetch(N, Map),
+    ebb_event:in(Receiver, N2, Val),
+    case is_done(State2) of
+	true  -> {stop, normal, State2};
+	false -> {next_state, forwarding, State2}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+run_ops(#par{ops=Ops}, Run, Receiver) ->
+    {_, Fsms} =
+	lists:foldl(
+	  fun(Op, {Offset, Fsms}) ->
+		  Out = ebb_prim:out_arity(Op),
+		  {ok, Target} =
+		      case Offset of
+			  0 -> {ok, Receiver};
+			  _ -> ebb_offset_fsm:start_link(Out, Offset, Receiver)
+		      end,
+		  case Run(Op, Target) of
+		      {ok, Fsm} -> {Offset+Out, [Fsm | Fsms]};
+		      ignore    -> {Offset+Out, [ignore | Fsms]}
+		  end
+	  end,
+	  {0, []}, Ops),
+    lists:reverse(Fsms).
+
+build_in_map(#par{ops=Ops}, Fsms) ->
+    {_, Map} = lists:foldl(
+		 fun({Op, Fsm}, {Offset, Map}) ->
+			 In = ebb_prim:in_arity(Op),
+			 L = [ {N+Offset, {N, Fsm}} || N <- lists:seq(1, In) ],
+			 {Offset+In, L ++ Map}
+		 end,
+		 {0, []}, lists:zip(Ops, Fsms)),
+    ?DICT:from_list(Map).
+
+is_done(#state{par=#par{in=In}, input=Inputs}) ->
+  (?DICT:size(Inputs) == In).

+ 27 - 0
src/ebb_pipe_fsm.erl

@@ -0,0 +1,27 @@
+-module(ebb_pipe_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/3]).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(#pipe{ops=Ops}, Run, Receiver)
+  when is_function(Run), is_pid(Receiver) ->
+    start_chain(lists:reverse(Ops), Run, Receiver).
+
+start_chain([Op1|Ops], Run, Receiver) ->
+    lists:foldl(fun(Op, {ok, Target}) ->
+			Out = ebb_prim:out_arity(Op),
+			{ok, Link} = ebb_chain_fsm:start_link(Out, Target),
+			Run(Op, Link)
+		end,
+		Run(Op1, Receiver), Ops).

+ 166 - 0
src/ebb_route_fsm.erl

@@ -0,0 +1,166 @@
+-module(ebb_route_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/2]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbakcs
+-export([forwarding/2]).
+
+-define(DICT, orddict).
+
+-record(state, {arity, map, vals, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Route = #route{}, Receiver) when is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Route, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({Route=#route{in=In}, Receiver}) ->
+    State = #state{arity=In, map=build_map(Route),
+		   vals=?DICT:new(), receiver=Receiver},
+    {ok, forwarding, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+forwarding({in, N, Val},
+	   State = #state{arity=Arity, map=Map,
+			  vals=Vals, receiver=Receiver})
+  when N > 0, N =< Arity ->
+    Vals2 = ?DICT:store(N, Val, Vals),
+    State2 = State#state{vals=Vals2},
+    lists:foreach(fun(M) -> ebb_event:out(Receiver, M, Val) end,
+		  ?DICT:fetch(N, Map)),
+    case is_done(State2) of
+	true  -> {stop, normal, State2};
+	false -> {next_state, forwarding, State2}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+build_map(#route{map=RawMap, out=Out}) ->
+    lists:foldl(fun({N, M}, Map) -> ?DICT:append(N, M, Map) end,
+		?DICT:new(),
+		lists:zip(RawMap, lists:seq(1, Out))).
+
+is_done(#state{arity=Arity, vals=Vals}) ->
+    ?DICT:size(Vals) == Arity.

+ 152 - 0
src/ebb_split_fsm.erl

@@ -0,0 +1,152 @@
+-module(ebb_split_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/2]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([waiting/2]).
+
+-record(state, {size, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Split = #split{}, Receiver) when is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Split, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({#split{size=Size}, Receiver}) ->
+    State = #state{size=Size, receiver=Receiver},
+    {ok, waiting, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+waiting({in, 1, Val}, State = #state{size=Size, receiver=Receiver}) ->
+    List = case {is_list(Val), is_tuple(Val)} of
+	       {true,  false} -> Val;
+	       {false, true}  -> tuple_to_list(Val)
+	   end,
+    lists:foreach(fun({N, X}) -> ebb_event:out(Receiver, N, X) end,
+		  lists:zip(lists:seq(1, Size), List)),
+    {stop, normal, State}.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------

+ 184 - 0
src/ebb_switch_fsm.erl

@@ -0,0 +1,184 @@
+-module(ebb_switch_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/3]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([waiting/2, forwarding/2]).
+
+-define(DICT, orddict).
+
+-record(state, {arity, map, vals, run, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Switch = #switch{}, Run, Receiver)
+  when is_function(Run), is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Switch, Run, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({#switch{in=Arity, map=Map}, Run, Receiver}) ->
+    State = #state{arity=Arity, map=?DICT:from_list(Map), vals=?DICT:new(),
+		   run=Run, receiver=Receiver},
+    {ok, waiting, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+waiting({in, 1, Tag},
+	State = #state{map=Map, vals=Vals, run=Run, receiver=Receiver}) ->
+    Path = ?DICT:fetch(Tag, Map),
+    {ok, Target} = Run(Path, Receiver),
+    State2 = State#state{receiver=Target},
+    forward_all(Vals, Target),
+    case is_done(State2) of
+	true  -> {stop, normal, State2};
+	false -> {next_state, forwarding, State2}
+    end;
+
+waiting({in, N, Val}, State = #state{arity=Arity, vals=Vals})
+  when N > 0, N =< Arity ->
+    Vals2 = ?DICT:store(N, Val, Vals),
+    State2 = State#state{vals=Vals2},
+    {next_state, waiting, State2}.
+
+forwarding({in, N, Val},
+	   State = #state{arity=Arity, vals=Vals, receiver=Receiver})
+  when N > 1, N =< Arity ->
+    Vals2 = ?DICT:store(N, Val, Vals),
+    State2 = State#state{vals=Vals2},
+    forward(N, Val, Receiver),
+    case is_done(State2) of
+	true  -> {stop, normal, State2};
+	false -> {next_state, forwarding, State2}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+forward_all(Vals, Receiver) ->
+    lists:foreach(fun({N, Val}) -> forward(N, Val, Receiver) end,
+		  ?DICT:to_list(Vals)).
+
+forward(N, Val, Receiver) ->
+    ebb_event:in(Receiver, N-1, Val).
+
+is_done(#state{arity=Arity, vals=Vals}) ->
+    ?DICT:size(Vals) == Arity-1.

+ 161 - 0
src/ebb_sync_fsm.erl

@@ -0,0 +1,161 @@
+-module(ebb_sync_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/2]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([waiting/2]).
+
+-define(DICT, orddict).
+
+-record(state, {arity, vals, receiver}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Sync = #sync{}, Receiver) when is_pid(Receiver) ->
+    gen_fsm:start_link(?MODULE, {Sync, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({#sync{size=Arity}, Receiver}) ->
+    State = #state{arity=Arity, vals=?DICT:new(), receiver=Receiver},
+    {ok, waiting, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+waiting({in, N, Val},
+	State = #state{arity=Arity, vals=Vals, receiver=Receiver})
+  when N > 0, N =< Arity ->
+    Vals2 = ?DICT:store(N, Val, Vals),
+    State2 = State#state{vals=Vals2},
+    case is_done(State2) of
+	true  -> lists:foreach(
+		   fun({M, X}) -> ebb_event:out(Receiver, M, X) end,
+		   ?DICT:from_list(Vals2)),
+		 {stop, normal, State2};
+	false -> {next_state, waiting, State2}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+is_done(#state{arity=Arity, vals=Vals}) ->
+    ?DICT:size(Vals) == Arity.
+

+ 137 - 0
src/ebb_value_fsm.erl

@@ -0,0 +1,137 @@
+-module(ebb_value_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/2]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3,
+	 handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Value = #value{}, Receiver) ->
+    gen_fsm:start_link(?MODULE, {Value, Receiver}, []).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({#value{value=Value}, Receiver}) ->
+    ebb_event:out(Receiver, 1, Value),
+    ignore.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------