-module(ebb_run). -include("../include/ebb_prim.hrl"). %%% Evaulation -export([linearize/1, run_linear/2, run_linear/1]). -export([run/3, run/2]). -export([localize/1, run_local/2, run_local/1]). -export([run_distributed/2, run_distributed/1]). %%% Simplification -export([simpl/1]). -define(DICT, orddict). %%%----------------------------------------------------------------------------- %%% Evaluation %%%----------------------------------------------------------------------------- %%% Linearized execution linearize(Op) -> fun(Args) -> run_linear(Op, Args) end. run_linear(Op) -> run_linear(Op, []). run_linear(Op, Args) -> case ebb_prim:is_operation(Op) andalso ebb_prim:in_arity(Op) == length(Args) of true -> do_run_linear(Op, Args); false -> erlang:error(badarg, [Op, Args]) end. do_run_linear(#value{value=X}, []) -> list_output(X); do_run_linear(#func{code=F}, Args) -> [apply(F, Args)]; do_run_linear(#pipe{ops=Ops}, Args) -> lists:foldl(fun do_run_linear/2, Args, Ops); do_run_linear(#par{ops=Ops}, Args) -> {Result, []} = lists:foldl( fun(Op, {Results, Remaining}) -> {Args1, Rest} = lists:split(ebb_prim:in_arity(Op), Remaining), {[do_run_linear(Op, Args1) | Results], Rest} end, {[], Args}, Ops), lists:append(lists:reverse(Result)); do_run_linear(#route{map=M}, Args) -> [ lists:nth(I, Args) || I <- M ]; do_run_linear(#sync{}, Args) -> Args; do_run_linear(#split{}, Args) -> Args; do_run_linear(#merge{}, Args) -> Args; do_run_linear(#switch{map=M}, [Tag | Args]) -> do_run_linear(element(2, lists:keyfind(Tag, 1, M)), Args). list_output(X) -> if is_list(X) -> X; is_tuple(X) -> tuple_to_list(X); true -> [X] end. %%% Local execution localize(Op) -> fun(Args) -> run_local(Op, Args) end. run(Mode, Op) -> run(Mode, Op, []). run(Mode, Op, Args) -> N = length(Args), case ebb_prim:is_operation(Op) andalso ebb_prim:in_arity(Op) == N of true -> case ebb_operation_fsm:start_link(Op, Mode) of {ok, Pid} -> ebb_operation_fsm:send_in(Pid, Args), Result = ebb_operation_fsm:return_out(Pid), ebb_operation_fsm:cleanup(Pid), {ok, Result}; Error -> Error end; false -> erlang:error(badarg, [Op, Args]) end. run_local(Op) -> run(local, Op). run_local(Op, Args) -> run(local, Op, Args). run_distributed(Op) -> run(distributed, Op). run_distributed(Op, Args) -> run(distributed, Op, Args). %%%----------------------------------------------------------------------------- %%% Simplification %%%----------------------------------------------------------------------------- simpl(Pipe = #pipe{ops=Inside}) -> flatten(Pipe#pipe{ops=simpl_pipe([ simpl(O) || O <- Inside ])}); simpl(Par = #par{ops=Inside}) -> flatten(Par#par{ops=simpl_par([ simpl(O) || O <- Inside ])}); simpl(Switch = #switch{map=Map}) -> Switch#switch{map=[ {Tag, simpl(Op)} || {Tag, Op} <- Map ]}; simpl(Op) -> case ebb_prim:is_operation(Op) of true -> Op; false -> erlang:error(badarg, [Op]) end. simpl_pipe(Ops) -> lists:reverse(lists:foldl(fun simpl_pipe/2, [], Ops)). simpl_pipe(#pipe{ops=Ops}, Acc) -> lists:reverse(Ops, Acc); simpl_pipe(Par = #par{}, [Any | Acc]) -> simpl_pipe_par(Any, Par) ++ Acc; simpl_pipe(R2 = #route{in=N}, Acc) -> case {R2 == ebb_flow:id(N), Acc} of {true, _} -> Acc; {false, [R1 = #route{} | Rest]} -> R3 = #route{in=N3} = simpl_pipe_route(R1, R2), case R3 == ebb_flow:id(N3) of true -> Rest; false -> [R3 | Rest] end; {false, _} -> [R2 | Acc] end; simpl_pipe(#split{size=N}, [#merge{size=N} | Acc]) -> Acc; simpl_pipe(#merge{size=N}, [#split{size=N} | Acc]) -> Acc; simpl_pipe(Op, Acc) -> [Op | Acc]. simpl_pipe_par(Par1 = #par{ops=Ops1}, Par2 = #par{ops=Ops2}) -> case zip_pars(Ops1, Ops2) of [{Ops1, Ops2}] -> [Par2, Par1]; Zip -> [permute_pipe_par(Zip)] end; simpl_pipe_par(Any, Par = #par{ops=Ops}) -> case zip_pars([Any], Ops) of [{Any, Ops}] -> [Any, Par]; Zip -> [permute_pipe_par(Zip)] end. permute_pipe_par(Zip) -> ebb_prim:par( lists:flatten( [ case {O1, O2} of {[_|_], [_|_]} -> [simpl(ebb_prim:pipe( [ flatten(ebb_prim:par(X)) || X <- [O1, O2] ]))]; {_, _} -> [O1, O2] end || {O1, O2} <- Zip ])). zip_pars(Par1, Par2) -> zip_pars(1, Par1, [], 1, Par2, [], []). zip_pars(_, _Par1 = [], Prev1, _, Par2, Prev2, Acc) -> lists:reverse([ {lists:reverse(Prev1), lists:reverse(Prev2, Par2)} | Acc ]); zip_pars(_, Par1, Prev1, _, _Par2 = [], Prev2, Acc) -> lists:reverse([ {lists:reverse(Prev1, Par1), lists:reverse(Prev2)} | Acc ]); zip_pars(M, _Par1 = [Op1|Rest1], Prev1, N, _Par2 = [Op2|Rest2], Prev2, Acc) -> case {M+ebb_prim:out_arity(Op1), N+ebb_prim:in_arity(Op2), Prev2} of {_, N, []} -> zip_pars(M, [Op1|Rest1], Prev1, N, Rest2, [], [ {[], Op2} | Acc ]); {I, I, _} -> zip_pars(I, Rest1, [], I, Rest2, [], [ {lists:reverse([Op1|Prev1]), lists:reverse([Op2|Prev2])} | Acc ]); {M2, N2, _} when M2 < N2 -> zip_pars(M2, Rest1, [Op1|Prev1], N, [Op2|Rest2], Prev2, Acc); {M2, N2, _} when N2 < M2 -> zip_pars(M, [Op1|Rest1], Prev1, N2, Rest2, [Op2|Prev2], Acc) end. simpl_pipe_route(#route{in=In, map=M1}, #route{map=M2}) -> ebb_prim:route(In, [ lists:nth(I, M1) || I <- M2 ]). simpl_par(Ops) -> lists:reverse(lists:foldl(fun simpl_par/2, [], Ops)). simpl_par(#par{ops=Ops}, Acc) -> lists:reverse(Ops, Acc); simpl_par(#route{in=0, out=0, map=[]}, Acc) -> Acc; simpl_par(R2 = #route{}, [R1 = #route{} | Acc]) -> [simpl_par_route(R1, R2) | Acc]; simpl_par(Op, Acc) -> [Op | Acc]. simpl_par_route(#route{in=In1, map=M1}, #route{in=In2, map=M2}) -> ebb_prim:route(In1+In2, lists:append(M1, [ S2+In1 || S2 <- M2 ])). flatten(#pipe{in=N, out=N, ops=[]}) -> ebb_flow:id(N); flatten(#pipe{ops=[Single]}) -> Single; flatten(#par{in=N, out=N, ops=[]}) -> ebb_flow:id(N); flatten(#par{ops=[Single]}) -> Single; flatten(Op) -> Op.