|
@@ -1,10 +1,11 @@
|
|
|
-module(ebb_run).
|
|
|
-include("../include/ebb_prim.hrl").
|
|
|
--compile(export_all).
|
|
|
|
|
|
%%% Evaulation
|
|
|
-export([linearize/1, run_linear/2, run_linear/1]).
|
|
|
--export([localize/1, run_local/3, run_local/2, run_local/1, collect_results/1]).
|
|
|
+-export([run/3, run/2]).
|
|
|
+-export([localize/1, run_local/2, run_local/1]).
|
|
|
+-export([run_distributed/2, run_distributed/1]).
|
|
|
|
|
|
%%% Simplification
|
|
|
-export([simpl/1]).
|
|
@@ -63,64 +64,36 @@ list_output(X) ->
|
|
|
|
|
|
%%% Local execution
|
|
|
localize(Op) ->
|
|
|
- fun(Args) -> run_local(Op, Args),
|
|
|
- collect_results(ebb_prim:out_arity(Op))
|
|
|
- end.
|
|
|
-
|
|
|
-collect_results(Size) ->
|
|
|
- collect_results(Size, ?DICT:new()).
|
|
|
-
|
|
|
-collect_results(Size, Vals) ->
|
|
|
- receive
|
|
|
- {_, {out, N, Val}} ->
|
|
|
- Vals2 = ?DICT:store(N, Val, Vals),
|
|
|
- case ?DICT:size(Vals2) == Size of
|
|
|
- true ->
|
|
|
- [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Vals2)) ];
|
|
|
- false ->
|
|
|
- collect_results(Size, Vals2)
|
|
|
- end
|
|
|
+ fun(Args) -> run_local(Op, Args) end.
|
|
|
+
|
|
|
+run(Mode, Op) ->
|
|
|
+ run(Mode, Op, []).
|
|
|
+
|
|
|
+run(Mode, Op, Args) ->
|
|
|
+ N = length(Args),
|
|
|
+ case ebb_prim:is_operation(Op) andalso ebb_prim:in_arity(Op) == N of
|
|
|
+ true ->
|
|
|
+ case ebb_operation_fsm:start_link(Op, Mode) of
|
|
|
+ {ok, Pid} -> ebb_operation_fsm:send_in(Pid, Args),
|
|
|
+ Result = ebb_operation_fsm:return_out(Pid),
|
|
|
+ ebb_operation_fsm:cleanup(Pid),
|
|
|
+ {ok, Result};
|
|
|
+ Error -> Error
|
|
|
+ end;
|
|
|
+ false -> erlang:error(badarg, [Op, Args])
|
|
|
end.
|
|
|
|
|
|
run_local(Op) ->
|
|
|
- run_local(Op, []).
|
|
|
+ run(local, Op).
|
|
|
|
|
|
run_local(Op, Args) ->
|
|
|
- run_local(Op, Args, self()).
|
|
|
-
|
|
|
-run_local(Op, Args, Receiver) ->
|
|
|
- case ebb_prim:is_operation(Op) andalso is_list(Args)
|
|
|
- andalso ebb_prim:in_arity(Op) == length(Args) of
|
|
|
- true -> case start_local(Op, Receiver) of
|
|
|
- {ok, Proc} -> send_args(Proc, Args);
|
|
|
- Err -> Err
|
|
|
- end;
|
|
|
- false -> erlang:error(badarg, [Op, Args])
|
|
|
- end.
|
|
|
+ run(local, Op, Args).
|
|
|
+
|
|
|
+run_distributed(Op) ->
|
|
|
+ run(distributed, Op).
|
|
|
|
|
|
-send_args(Proc, Args) ->
|
|
|
- lists:foldl(fun(Arg, N) -> ebb_event:in(Proc, N, Arg), N+1 end,
|
|
|
- 1, Args),
|
|
|
- ok.
|
|
|
-
|
|
|
-start_local(Func = #func{}, Receiver) ->
|
|
|
- ebb_func_fsm:start_link(Func, Receiver);
|
|
|
-start_local(Value = #value{}, Receiver) ->
|
|
|
- ebb_value_fsm:start_link(Value, Receiver);
|
|
|
-start_local(Pipe = #pipe{}, Receiver) ->
|
|
|
- ebb_pipe_fsm:start_link(Pipe, fun start_local/2, Receiver);
|
|
|
-start_local(Par = #par{}, Receiver) ->
|
|
|
- ebb_par_fsm:start_link(Par, fun start_local/2, Receiver);
|
|
|
-start_local(Route = #route{}, Receiver) ->
|
|
|
- ebb_route_fsm:start_link(Route, Receiver);
|
|
|
-start_local(Sync = #sync{}, Receiver) ->
|
|
|
- ebb_sync_fsm:start_link(Sync, Receiver);
|
|
|
-start_local(Split = #split{}, Receiver) ->
|
|
|
- ebb_split_fsm:start_link(Split, Receiver);
|
|
|
-start_local(Merge = #merge{}, Receiver) ->
|
|
|
- ebb_merge_fsm:start_link(Merge, Receiver);
|
|
|
-start_local(Switch = #switch{}, Receiver) ->
|
|
|
- ebb_switch_fsm:start_link(Switch, fun start_local/2, Receiver).
|
|
|
+run_distributed(Op, Args) ->
|
|
|
+ run(distributed, Op, Args).
|
|
|
|
|
|
%%%-----------------------------------------------------------------------------
|
|
|
%%% Simplification
|