瀏覽代碼

Added local concurrent execution to ebb_run module.

Paul Downen 13 年之前
父節點
當前提交
4df381999e
共有 1 個文件被更改,包括 68 次插入2 次删除
  1. 68 2
      src/ebb_run.erl

+ 68 - 2
src/ebb_run.erl

@@ -1,16 +1,21 @@
 -module(ebb_run).
 -include("../include/ebb_prim.hrl").
+-compile(export_all).
 
 %%% Evaulation
 -export([linearize/1, run_linear/2, run_linear/1]).
+-export([localize/1, run_local/3, run_local/2, run_local/1, collect_results/1]).
 
 %%% Simplification
 -export([simpl/1]).
 
+-define(DICT, orddict).
+
 %%%-----------------------------------------------------------------------------
 %%% Evaluation
 %%%-----------------------------------------------------------------------------
 
+%%% Linearized execution
 linearize(Op) ->
     fun(Args) -> run_linear(Op, Args) end.
 
@@ -21,7 +26,7 @@ run_linear(Op, Args) ->
     case ebb_prim:is_operation(Op) andalso
 	ebb_prim:in_arity(Op) == length(Args) of
 	true  -> do_run_linear(Op, Args);
-	false -> erlang:error(badargs, [Op, Args])
+	false -> erlang:error(badarg, [Op, Args])
     end.
 
 do_run_linear(#value{value=X}, []) ->
@@ -55,7 +60,68 @@ list_output(X) ->
        is_tuple(X) -> tuple_to_list(X);
        true -> [X]
     end.
-    
+
+%%% Local execution
+localize(Op) ->
+    fun(Args) -> run_local(Op, Args),
+		 collect_results(ebb_prim:out_arity(Op))
+    end.
+
+collect_results(Size) ->
+    collect_results(Size, ?DICT:new()).
+
+collect_results(Size, Vals) ->
+    receive
+	{_, {out, N, Val}} ->
+	    Vals2 = ?DICT:store(N, Val, Vals),
+	    case ?DICT:size(Vals2) == Size of
+		true ->
+		    [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Vals2)) ];
+		false ->
+		    collect_results(Size, Vals2)
+	    end
+    end.
+
+run_local(Op) ->
+    run_local(Op, []).
+
+run_local(Op, Args) ->
+    run_local(Op, Args, self()).
+
+run_local(Op, Args, Receiver) ->
+    case ebb_prim:is_operation(Op) andalso is_list(Args)
+	andalso ebb_prim:in_arity(Op) == length(Args) of
+	true  -> case start_local(Op, Receiver) of
+		     {ok, Proc} -> send_args(Proc, Args);
+		     Err        -> Err
+		 end;
+	false -> erlang:error(badarg, [Op, Args])
+    end.
+
+send_args(Proc, Args) ->
+    lists:foldl(fun(Arg, N) -> ebb_event:in(Proc, N, Arg), N+1 end,
+		1, Args),
+    ok.
+
+start_local(Func = #func{}, Receiver) ->
+    ebb_func_fsm:start_link(Func, Receiver);
+start_local(Value = #value{}, Receiver) ->
+    ebb_value_fsm:start_link(Value, Receiver);
+start_local(Pipe = #pipe{}, Receiver) ->
+    ebb_pipe_fsm:start_link(Pipe, fun start_local/2, Receiver);
+start_local(Par = #par{}, Receiver) ->
+    ebb_par_fsm:start_link(Par, fun start_local/2, Receiver);
+start_local(Route = #route{}, Receiver) ->
+    ebb_route_fsm:start_link(Route, Receiver);
+start_local(Sync = #sync{}, Receiver) ->
+    ebb_sync_fsm:start_link(Sync, Receiver);
+start_local(Split = #split{}, Receiver) ->
+    ebb_split_fsm:start_link(Split, Receiver);
+start_local(Merge = #merge{}, Receiver) ->
+    ebb_merge_fsm:start_link(Merge, Receiver);
+start_local(Switch = #switch{}, Receiver) ->
+    ebb_switch_fsm:start_link(Switch, fun start_local/2, Receiver).
+
 %%%-----------------------------------------------------------------------------
 %%% Simplification
 %%%-----------------------------------------------------------------------------