Sfoglia il codice sorgente

Merge branch 'master' of github.com:aisamanra/ebb

getty 13 anni fa
parent
commit
22cf0d08c8
3 ha cambiato i file con 40 aggiunte e 66 eliminazioni
  1. 5 1
      src/ebb_event.erl
  2. 8 11
      src/ebb_func_fsm.erl
  3. 27 54
      src/ebb_run.erl

+ 5 - 1
src/ebb_event.erl

@@ -1,9 +1,13 @@
 -module(ebb_event).
 
--export([in/3, out/3]).
+-export([in/3, out/3, return/2]).
 
 in(Receiver, N, Val) ->
     gen_fsm:send_event(Receiver, {in, N, Val}).
 
 out(Receiver, N, Val) ->
     gen_fsm:send_event(Receiver, {out, N, Val}).
+
+return(Receiver, Val) ->
+    gen_fsm:send_event(Receiver, {return, Val}).
+

+ 8 - 11
src/ebb_func_fsm.erl

@@ -28,14 +28,11 @@
 %% does not return until Module:init/1 has returned.  
 %%--------------------------------------------------------------------
 start_link(Func = #func{}, Receiver) ->
-    Call = fun(F, Args, Return) ->
-		   spawn(fun() -> return(Return, apply(F, Args)) end)
-	   end,
-    start_link(Func, Call, Receiver).
+    start_link(Func, local, Receiver).
 
-start_link(Func = #func{}, Call, Receiver)
-  when is_function(Call), is_pid(Receiver) ->
-    gen_fsm:start_link(?MODULE, {Func, Call, Receiver}, []).
+start_link(Func = #func{}, Mode, Receiver)
+  when is_pid(Receiver), Mode == local orelse Mode == distributed ->
+    gen_fsm:start_link(?MODULE, {Func, Mode, Receiver}, []).
 
 %%====================================================================
 %% gen_fsm callbacks
@@ -49,7 +46,10 @@ start_link(Func = #func{}, Call, Receiver)
 %% gen_fsm:start_link/3,4, this function is called by the new process to 
 %% initialize. 
 %%--------------------------------------------------------------------
-init({Func, Call, Receiver}) ->
+init({Func, Mode, Receiver}) ->
+    Call = fun(Code, Args, Return) ->
+		   ebb_worker_supv:start_link(Mode, Code, Args, Return)
+	   end,
     State = #state{func=Func, args=?DICT:new(), call=Call,
 		   receiver=Receiver},
     case Func of
@@ -176,6 +176,3 @@ code_change(_OldVsn, StateName, State, _Extra) ->
 
 is_ready(#state{func=#func{in=In}, args=Args}) ->
     ?DICT:size(Args) == In.
-
-return(Receiver, Val) ->
-    gen_fsm:send_event(Receiver, {return, Val}).

+ 27 - 54
src/ebb_run.erl

@@ -1,10 +1,11 @@
 -module(ebb_run).
 -include("../include/ebb_prim.hrl").
--compile(export_all).
 
 %%% Evaulation
 -export([linearize/1, run_linear/2, run_linear/1]).
--export([localize/1, run_local/3, run_local/2, run_local/1, collect_results/1]).
+-export([run/3, run/2]).
+-export([localize/1, run_local/2, run_local/1]).
+-export([run_distributed/2, run_distributed/1]).
 
 %%% Simplification
 -export([simpl/1]).
@@ -63,64 +64,36 @@ list_output(X) ->
 
 %%% Local execution
 localize(Op) ->
-    fun(Args) -> run_local(Op, Args),
-		 collect_results(ebb_prim:out_arity(Op))
-    end.
-
-collect_results(Size) ->
-    collect_results(Size, ?DICT:new()).
-
-collect_results(Size, Vals) ->
-    receive
-	{_, {out, N, Val}} ->
-	    Vals2 = ?DICT:store(N, Val, Vals),
-	    case ?DICT:size(Vals2) == Size of
-		true ->
-		    [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Vals2)) ];
-		false ->
-		    collect_results(Size, Vals2)
-	    end
+    fun(Args) -> run_local(Op, Args) end.
+
+run(Mode, Op) ->
+    run(Mode, Op, []).
+
+run(Mode, Op, Args) ->
+    N = length(Args),
+    case ebb_prim:is_operation(Op) andalso ebb_prim:in_arity(Op) == N of
+	true ->
+	    case ebb_operation_fsm:start_link(Op, Mode) of
+		{ok, Pid} -> ebb_operation_fsm:send_in(Pid, Args),
+			     Result = ebb_operation_fsm:return_out(Pid),
+			     ebb_operation_fsm:cleanup(Pid),
+			     {ok, Result};
+		Error -> Error
+	    end;
+	false -> erlang:error(badarg, [Op, Args])
     end.
 
 run_local(Op) ->
-    run_local(Op, []).
+    run(local, Op).
 
 run_local(Op, Args) ->
-    run_local(Op, Args, self()).
-
-run_local(Op, Args, Receiver) ->
-    case ebb_prim:is_operation(Op) andalso is_list(Args)
-	andalso ebb_prim:in_arity(Op) == length(Args) of
-	true  -> case start_local(Op, Receiver) of
-		     {ok, Proc} -> send_args(Proc, Args);
-		     Err        -> Err
-		 end;
-	false -> erlang:error(badarg, [Op, Args])
-    end.
+    run(local, Op, Args).
+
+run_distributed(Op) ->
+    run(distributed, Op).
 
-send_args(Proc, Args) ->
-    lists:foldl(fun(Arg, N) -> ebb_event:in(Proc, N, Arg), N+1 end,
-		1, Args),
-    ok.
-
-start_local(Func = #func{}, Receiver) ->
-    ebb_func_fsm:start_link(Func, Receiver);
-start_local(Value = #value{}, Receiver) ->
-    ebb_value_fsm:start_link(Value, Receiver);
-start_local(Pipe = #pipe{}, Receiver) ->
-    ebb_pipe_fsm:start_link(Pipe, fun start_local/2, Receiver);
-start_local(Par = #par{}, Receiver) ->
-    ebb_par_fsm:start_link(Par, fun start_local/2, Receiver);
-start_local(Route = #route{}, Receiver) ->
-    ebb_route_fsm:start_link(Route, Receiver);
-start_local(Sync = #sync{}, Receiver) ->
-    ebb_sync_fsm:start_link(Sync, Receiver);
-start_local(Split = #split{}, Receiver) ->
-    ebb_split_fsm:start_link(Split, Receiver);
-start_local(Merge = #merge{}, Receiver) ->
-    ebb_merge_fsm:start_link(Merge, Receiver);
-start_local(Switch = #switch{}, Receiver) ->
-    ebb_switch_fsm:start_link(Switch, fun start_local/2, Receiver).
+run_distributed(Op, Args) ->
+    run(distributed, Op, Args).
 
 %%%-----------------------------------------------------------------------------
 %%% Simplification