123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- -module(ebb_run).
- -include("../include/ebb_prim.hrl").
- -compile(export_all).
- %%% Evaulation
- -export([linearize/1, run_linear/2, run_linear/1]).
- -export([localize/1, run_local/3, run_local/2, run_local/1, collect_results/1]).
- %%% Simplification
- -export([simpl/1]).
- -define(DICT, orddict).
- %%%-----------------------------------------------------------------------------
- %%% Evaluation
- %%%-----------------------------------------------------------------------------
- %%% Linearized execution
- linearize(Op) ->
- fun(Args) -> run_linear(Op, Args) end.
- run_linear(Op) ->
- run_linear(Op, []).
- run_linear(Op, Args) ->
- case ebb_prim:is_operation(Op) andalso
- ebb_prim:in_arity(Op) == length(Args) of
- true -> do_run_linear(Op, Args);
- false -> erlang:error(badarg, [Op, Args])
- end.
- do_run_linear(#value{value=X}, []) ->
- list_output(X);
- do_run_linear(#func{code=F}, Args) ->
- list_output(apply(F, Args));
- do_run_linear(#pipe{ops=Ops}, Args) ->
- lists:foldl(fun do_run_linear/2, Args, Ops);
- do_run_linear(#par{ops=Ops}, Args) ->
- {Result, []} =
- lists:foldl(
- fun(Op, {Results, Remaining}) ->
- {Args1, Rest} = lists:split(ebb_prim:in_arity(Op), Remaining),
- {[do_run_linear(Op, Args1) | Results], Rest}
- end,
- {[], Args}, Ops),
- lists:append(lists:reverse(Result));
- do_run_linear(#route{map=M}, Args) ->
- [ lists:nth(I, Args) || I <- M ];
- do_run_linear(#sync{}, Args) ->
- Args;
- do_run_linear(#split{}, Args) ->
- Args;
- do_run_linear(#merge{}, Args) ->
- Args;
- do_run_linear(#switch{map=M}, [Tag | Args]) ->
- do_run_linear(element(2, lists:keyfind(Tag, 1, M)), Args).
- list_output(X) ->
- if is_list(X) -> X;
- is_tuple(X) -> tuple_to_list(X);
- true -> [X]
- end.
- %%% Local execution
- localize(Op) ->
- fun(Args) -> run_local(Op, Args),
- collect_results(ebb_prim:out_arity(Op))
- end.
- collect_results(Size) ->
- collect_results(Size, ?DICT:new()).
- collect_results(Size, Vals) ->
- receive
- {_, {out, N, Val}} ->
- Vals2 = ?DICT:store(N, Val, Vals),
- case ?DICT:size(Vals2) == Size of
- true ->
- [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Vals2)) ];
- false ->
- collect_results(Size, Vals2)
- end
- end.
- run_local(Op) ->
- run_local(Op, []).
- run_local(Op, Args) ->
- run_local(Op, Args, self()).
- run_local(Op, Args, Receiver) ->
- case ebb_prim:is_operation(Op) andalso is_list(Args)
- andalso ebb_prim:in_arity(Op) == length(Args) of
- true -> case start_local(Op, Receiver) of
- {ok, Proc} -> send_args(Proc, Args);
- Err -> Err
- end;
- false -> erlang:error(badarg, [Op, Args])
- end.
- send_args(Proc, Args) ->
- lists:foldl(fun(Arg, N) -> ebb_event:in(Proc, N, Arg), N+1 end,
- 1, Args),
- ok.
- start_local(Func = #func{}, Receiver) ->
- ebb_func_fsm:start_link(Func, Receiver);
- start_local(Value = #value{}, Receiver) ->
- ebb_value_fsm:start_link(Value, Receiver);
- start_local(Pipe = #pipe{}, Receiver) ->
- ebb_pipe_fsm:start_link(Pipe, fun start_local/2, Receiver);
- start_local(Par = #par{}, Receiver) ->
- ebb_par_fsm:start_link(Par, fun start_local/2, Receiver);
- start_local(Route = #route{}, Receiver) ->
- ebb_route_fsm:start_link(Route, Receiver);
- start_local(Sync = #sync{}, Receiver) ->
- ebb_sync_fsm:start_link(Sync, Receiver);
- start_local(Split = #split{}, Receiver) ->
- ebb_split_fsm:start_link(Split, Receiver);
- start_local(Merge = #merge{}, Receiver) ->
- ebb_merge_fsm:start_link(Merge, Receiver);
- start_local(Switch = #switch{}, Receiver) ->
- ebb_switch_fsm:start_link(Switch, fun start_local/2, Receiver).
- %%%-----------------------------------------------------------------------------
- %%% Simplification
- %%%-----------------------------------------------------------------------------
- simpl(Pipe = #pipe{ops=Inside}) ->
- flatten(Pipe#pipe{ops=simpl_pipe([ simpl(O) || O <- Inside ])});
- simpl(Par = #par{ops=Inside}) ->
- flatten(Par#par{ops=simpl_par([ simpl(O) || O <- Inside ])});
- simpl(Switch = #switch{map=Map}) ->
- Switch#switch{map=[ {Tag, simpl(Op)} || {Tag, Op} <- Map ]};
- simpl(Op) ->
- case ebb_prim:is_operation(Op) of
- true -> Op;
- false -> erlang:error(badarg, [Op])
- end.
- simpl_pipe(Ops) ->
- lists:reverse(lists:foldl(fun simpl_pipe/2, [], Ops)).
- simpl_pipe(#pipe{ops=Ops}, Acc) ->
- lists:reverse(Ops, Acc);
- simpl_pipe(Par = #par{}, [Any | Acc]) ->
- simpl_pipe_par(Any, Par) ++ Acc;
- simpl_pipe(R2 = #route{in=N}, Acc) ->
- case {R2 == ebb_flow:id(N), Acc} of
- {true, _} -> Acc;
- {false, [R1 = #route{} | Rest]} ->
- R3 = #route{in=N3} = simpl_pipe_route(R1, R2),
- case R3 == ebb_flow:id(N3) of
- true -> Rest;
- false -> [R3 | Rest]
- end;
- {false, _} -> [R2 | Acc]
- end;
- simpl_pipe(#split{size=N}, [#merge{size=N} | Acc]) ->
- Acc;
- simpl_pipe(#merge{size=N}, [#split{size=N} | Acc]) ->
- Acc;
- simpl_pipe(Op, Acc) ->
- [Op | Acc].
- simpl_pipe_par(Par1 = #par{ops=Ops1}, Par2 = #par{ops=Ops2}) ->
- case zip_pars(Ops1, Ops2) of
- [{Ops1, Ops2}] -> [Par2, Par1];
- Zip -> [permute_pipe_par(Zip)]
- end;
- simpl_pipe_par(Any, Par = #par{ops=Ops}) ->
- case zip_pars([Any], Ops) of
- [{Any, Ops}] -> [Any, Par];
- Zip -> [permute_pipe_par(Zip)]
- end.
- permute_pipe_par(Zip) ->
- ebb_prim:par(
- lists:flatten(
- [ case {O1, O2} of
- {[_|_], [_|_]} ->
- [simpl(ebb_prim:pipe(
- [ flatten(ebb_prim:par(X))
- || X <- [O1, O2] ]))];
- {_, _} -> [O1, O2]
- end
- || {O1, O2} <- Zip ])).
- zip_pars(Par1, Par2) ->
- zip_pars(1, Par1, [], 1, Par2, [], []).
- zip_pars(_, _Par1 = [], Prev1,
- _, Par2, Prev2,
- Acc) ->
- lists:reverse([ {lists:reverse(Prev1), lists:reverse(Prev2, Par2)} | Acc ]);
- zip_pars(_, Par1, Prev1,
- _, _Par2 = [], Prev2,
- Acc) ->
- lists:reverse([ {lists:reverse(Prev1, Par1), lists:reverse(Prev2)} | Acc ]);
- zip_pars(M, _Par1 = [Op1|Rest1], Prev1,
- N, _Par2 = [Op2|Rest2], Prev2,
- Acc) ->
- case {M+ebb_prim:out_arity(Op1), N+ebb_prim:in_arity(Op2), Prev2} of
- {_, N, []} ->
- zip_pars(M, [Op1|Rest1], Prev1,
- N, Rest2, [],
- [ {[], Op2} | Acc ]);
- {I, I, _} ->
- zip_pars(I, Rest1, [],
- I, Rest2, [],
- [ {lists:reverse([Op1|Prev1]), lists:reverse([Op2|Prev2])}
- | Acc ]);
- {M2, N2, _} when M2 < N2 ->
- zip_pars(M2, Rest1, [Op1|Prev1],
- N, [Op2|Rest2], Prev2,
- Acc);
- {M2, N2, _} when N2 < M2 ->
- zip_pars(M, [Op1|Rest1], Prev1,
- N2, Rest2, [Op2|Prev2],
- Acc)
- end.
- simpl_pipe_route(#route{in=In, map=M1}, #route{map=M2}) ->
- ebb_prim:route(In, [ lists:nth(I, M1) || I <- M2 ]).
- simpl_par(Ops) ->
- lists:reverse(lists:foldl(fun simpl_par/2, [], Ops)).
- simpl_par(#par{ops=Ops}, Acc) ->
- lists:reverse(Ops, Acc);
- simpl_par(#route{in=0, out=0, map=[]}, Acc) ->
- Acc;
- simpl_par(R2 = #route{}, [R1 = #route{} | Acc]) ->
- [simpl_par_route(R1, R2) | Acc];
- simpl_par(Op, Acc) ->
- [Op | Acc].
- simpl_par_route(#route{in=In1, map=M1}, #route{in=In2, map=M2}) ->
- ebb_prim:route(In1+In2,
- lists:append(M1, [ S2+In1 || S2 <- M2 ])).
- flatten(#pipe{in=N, out=N, ops=[]}) ->
- ebb_flow:id(N);
- flatten(#pipe{ops=[Single]}) ->
- Single;
- flatten(#par{in=N, out=N, ops=[]}) ->
- ebb_flow:id(N);
- flatten(#par{ops=[Single]}) ->
- Single;
- flatten(Op) ->
- Op.
|