ebb_run.erl 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. -module(ebb_run).
  2. -include("../include/ebb_prim.hrl").
  3. %%% Evaulation
  4. -export([linearize/1, run_linear/2, run_linear/1]).
  5. -export([run/3, run/2]).
  6. -export([localize/1, run_local/2, run_local/1]).
  7. -export([run_distributed/2, run_distributed/1]).
  8. %%% Simplification
  9. -export([simpl/1]).
  10. -define(DICT, orddict).
  11. %%%-----------------------------------------------------------------------------
  12. %%% Evaluation
  13. %%%-----------------------------------------------------------------------------
  14. %%% Linearized execution
  15. linearize(Op) ->
  16. fun(Args) -> run_linear(Op, Args) end.
  17. run_linear(Op) ->
  18. run_linear(Op, []).
  19. run_linear(Op, Args) ->
  20. case ebb_prim:is_operation(Op) andalso
  21. ebb_prim:in_arity(Op) == length(Args) of
  22. true -> do_run_linear(Op, Args);
  23. false -> erlang:error(badarg, [Op, Args])
  24. end.
  25. do_run_linear(#value{value=X}, []) ->
  26. list_output(X);
  27. do_run_linear(#func{code=F}, Args) ->
  28. [apply(F, Args)];
  29. do_run_linear(#pipe{ops=Ops}, Args) ->
  30. lists:foldl(fun do_run_linear/2, Args, Ops);
  31. do_run_linear(#par{ops=Ops}, Args) ->
  32. {Result, []} =
  33. lists:foldl(
  34. fun(Op, {Results, Remaining}) ->
  35. {Args1, Rest} = lists:split(ebb_prim:in_arity(Op), Remaining),
  36. {[do_run_linear(Op, Args1) | Results], Rest}
  37. end,
  38. {[], Args}, Ops),
  39. lists:append(lists:reverse(Result));
  40. do_run_linear(#route{map=M}, Args) ->
  41. [ lists:nth(I, Args) || I <- M ];
  42. do_run_linear(#sync{}, Args) ->
  43. Args;
  44. do_run_linear(#split{}, Args) ->
  45. Args;
  46. do_run_linear(#merge{}, Args) ->
  47. Args;
  48. do_run_linear(#switch{map=M}, [Tag | Args]) ->
  49. do_run_linear(element(2, lists:keyfind(Tag, 1, M)), Args).
  50. list_output(X) ->
  51. if is_list(X) -> X;
  52. is_tuple(X) -> tuple_to_list(X);
  53. true -> [X]
  54. end.
  55. %%% Local execution
  56. localize(Op) ->
  57. fun(Args) -> run_local(Op, Args) end.
  58. run(Mode, Op) ->
  59. run(Mode, Op, []).
  60. run(Mode, Op, Args) ->
  61. N = length(Args),
  62. case ebb_prim:is_operation(Op) andalso ebb_prim:in_arity(Op) == N of
  63. true ->
  64. case ebb_operation_fsm:start_link(Op, Mode) of
  65. {ok, Pid} -> ebb_operation_fsm:send_in(Pid, Args),
  66. Result = ebb_operation_fsm:return_out(Pid),
  67. ebb_operation_fsm:cleanup(Pid),
  68. {ok, Result};
  69. Error -> Error
  70. end;
  71. false -> erlang:error(badarg, [Op, Args])
  72. end.
  73. run_local(Op) ->
  74. run(local, Op).
  75. run_local(Op, Args) ->
  76. run(local, Op, Args).
  77. run_distributed(Op) ->
  78. run(distributed, Op).
  79. run_distributed(Op, Args) ->
  80. run(distributed, Op, Args).
  81. %%%-----------------------------------------------------------------------------
  82. %%% Simplification
  83. %%%-----------------------------------------------------------------------------
  84. simpl(Pipe = #pipe{ops=Inside}) ->
  85. flatten(Pipe#pipe{ops=simpl_pipe([ simpl(O) || O <- Inside ])});
  86. simpl(Par = #par{ops=Inside}) ->
  87. flatten(Par#par{ops=simpl_par([ simpl(O) || O <- Inside ])});
  88. simpl(Switch = #switch{map=Map}) ->
  89. Switch#switch{map=[ {Tag, simpl(Op)} || {Tag, Op} <- Map ]};
  90. simpl(Op) ->
  91. case ebb_prim:is_operation(Op) of
  92. true -> Op;
  93. false -> erlang:error(badarg, [Op])
  94. end.
  95. simpl_pipe(Ops) ->
  96. lists:reverse(lists:foldl(fun simpl_pipe/2, [], Ops)).
  97. simpl_pipe(#pipe{ops=Ops}, Acc) ->
  98. lists:reverse(Ops, Acc);
  99. simpl_pipe(Par = #par{}, [Any | Acc]) ->
  100. simpl_pipe_par(Any, Par) ++ Acc;
  101. simpl_pipe(R2 = #route{in=N}, Acc) ->
  102. case {R2 == ebb_flow:id(N), Acc} of
  103. {true, _} -> Acc;
  104. {false, [R1 = #route{} | Rest]} ->
  105. R3 = #route{in=N3} = simpl_pipe_route(R1, R2),
  106. case R3 == ebb_flow:id(N3) of
  107. true -> Rest;
  108. false -> [R3 | Rest]
  109. end;
  110. {false, _} -> [R2 | Acc]
  111. end;
  112. simpl_pipe(#split{size=N}, [#merge{size=N} | Acc]) ->
  113. Acc;
  114. simpl_pipe(#merge{size=N}, [#split{size=N} | Acc]) ->
  115. Acc;
  116. simpl_pipe(Op, Acc) ->
  117. [Op | Acc].
  118. simpl_pipe_par(Par1 = #par{ops=Ops1}, Par2 = #par{ops=Ops2}) ->
  119. case zip_pars(Ops1, Ops2) of
  120. [{Ops1, Ops2}] -> [Par2, Par1];
  121. Zip -> [permute_pipe_par(Zip)]
  122. end;
  123. simpl_pipe_par(Any, Par = #par{ops=Ops}) ->
  124. case zip_pars([Any], Ops) of
  125. [{Any, Ops}] -> [Any, Par];
  126. Zip -> [permute_pipe_par(Zip)]
  127. end.
  128. permute_pipe_par(Zip) ->
  129. ebb_prim:par(
  130. lists:flatten(
  131. [ case {O1, O2} of
  132. {[_|_], [_|_]} ->
  133. [simpl(ebb_prim:pipe(
  134. [ flatten(ebb_prim:par(X))
  135. || X <- [O1, O2] ]))];
  136. {_, _} -> [O1, O2]
  137. end
  138. || {O1, O2} <- Zip ])).
  139. zip_pars(Par1, Par2) ->
  140. zip_pars(1, Par1, [], 1, Par2, [], []).
  141. zip_pars(_, _Par1 = [], Prev1,
  142. _, Par2, Prev2,
  143. Acc) ->
  144. lists:reverse([ {lists:reverse(Prev1), lists:reverse(Prev2, Par2)} | Acc ]);
  145. zip_pars(_, Par1, Prev1,
  146. _, _Par2 = [], Prev2,
  147. Acc) ->
  148. lists:reverse([ {lists:reverse(Prev1, Par1), lists:reverse(Prev2)} | Acc ]);
  149. zip_pars(M, _Par1 = [Op1|Rest1], Prev1,
  150. N, _Par2 = [Op2|Rest2], Prev2,
  151. Acc) ->
  152. case {M+ebb_prim:out_arity(Op1), N+ebb_prim:in_arity(Op2), Prev2} of
  153. {_, N, []} ->
  154. zip_pars(M, [Op1|Rest1], Prev1,
  155. N, Rest2, [],
  156. [ {[], Op2} | Acc ]);
  157. {I, I, _} ->
  158. zip_pars(I, Rest1, [],
  159. I, Rest2, [],
  160. [ {lists:reverse([Op1|Prev1]), lists:reverse([Op2|Prev2])}
  161. | Acc ]);
  162. {M2, N2, _} when M2 < N2 ->
  163. zip_pars(M2, Rest1, [Op1|Prev1],
  164. N, [Op2|Rest2], Prev2,
  165. Acc);
  166. {M2, N2, _} when N2 < M2 ->
  167. zip_pars(M, [Op1|Rest1], Prev1,
  168. N2, Rest2, [Op2|Prev2],
  169. Acc)
  170. end.
  171. simpl_pipe_route(#route{in=In, map=M1}, #route{map=M2}) ->
  172. ebb_prim:route(In, [ lists:nth(I, M1) || I <- M2 ]).
  173. simpl_par(Ops) ->
  174. lists:reverse(lists:foldl(fun simpl_par/2, [], Ops)).
  175. simpl_par(#par{ops=Ops}, Acc) ->
  176. lists:reverse(Ops, Acc);
  177. simpl_par(#route{in=0, out=0, map=[]}, Acc) ->
  178. Acc;
  179. simpl_par(R2 = #route{}, [R1 = #route{} | Acc]) ->
  180. [simpl_par_route(R1, R2) | Acc];
  181. simpl_par(Op, Acc) ->
  182. [Op | Acc].
  183. simpl_par_route(#route{in=In1, map=M1}, #route{in=In2, map=M2}) ->
  184. ebb_prim:route(In1+In2,
  185. lists:append(M1, [ S2+In1 || S2 <- M2 ])).
  186. flatten(#pipe{in=N, out=N, ops=[]}) ->
  187. ebb_flow:id(N);
  188. flatten(#pipe{ops=[Single]}) ->
  189. Single;
  190. flatten(#par{in=N, out=N, ops=[]}) ->
  191. ebb_flow:id(N);
  192. flatten(#par{ops=[Single]}) ->
  193. Single;
  194. flatten(Op) ->
  195. Op.