ebb_run.erl 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. -module(ebb_run).
  2. -include("../include/ebb_prim.hrl").
  3. -compile(export_all).
  4. %%% Evaulation
  5. -export([linearize/1, run_linear/2, run_linear/1]).
  6. -export([localize/1, run_local/3, run_local/2, run_local/1, collect_results/1]).
  7. %%% Simplification
  8. -export([simpl/1]).
  9. -define(DICT, orddict).
  10. %%%-----------------------------------------------------------------------------
  11. %%% Evaluation
  12. %%%-----------------------------------------------------------------------------
  13. %%% Linearized execution
  14. linearize(Op) ->
  15. fun(Args) -> run_linear(Op, Args) end.
  16. run_linear(Op) ->
  17. run_linear(Op, []).
  18. run_linear(Op, Args) ->
  19. case ebb_prim:is_operation(Op) andalso
  20. ebb_prim:in_arity(Op) == length(Args) of
  21. true -> do_run_linear(Op, Args);
  22. false -> erlang:error(badarg, [Op, Args])
  23. end.
  24. do_run_linear(#value{value=X}, []) ->
  25. list_output(X);
  26. do_run_linear(#func{code=F}, Args) ->
  27. [apply(F, Args)];
  28. do_run_linear(#pipe{ops=Ops}, Args) ->
  29. lists:foldl(fun do_run_linear/2, Args, Ops);
  30. do_run_linear(#par{ops=Ops}, Args) ->
  31. {Result, []} =
  32. lists:foldl(
  33. fun(Op, {Results, Remaining}) ->
  34. {Args1, Rest} = lists:split(ebb_prim:in_arity(Op), Remaining),
  35. {[do_run_linear(Op, Args1) | Results], Rest}
  36. end,
  37. {[], Args}, Ops),
  38. lists:append(lists:reverse(Result));
  39. do_run_linear(#route{map=M}, Args) ->
  40. [ lists:nth(I, Args) || I <- M ];
  41. do_run_linear(#sync{}, Args) ->
  42. Args;
  43. do_run_linear(#split{}, Args) ->
  44. Args;
  45. do_run_linear(#merge{}, Args) ->
  46. Args;
  47. do_run_linear(#switch{map=M}, [Tag | Args]) ->
  48. do_run_linear(element(2, lists:keyfind(Tag, 1, M)), Args).
  49. list_output(X) ->
  50. if is_list(X) -> X;
  51. is_tuple(X) -> tuple_to_list(X);
  52. true -> [X]
  53. end.
  54. %%% Local execution
  55. localize(Op) ->
  56. fun(Args) -> run_local(Op, Args),
  57. collect_results(ebb_prim:out_arity(Op))
  58. end.
  59. collect_results(Size) ->
  60. collect_results(Size, ?DICT:new()).
  61. collect_results(Size, Vals) ->
  62. receive
  63. {_, {out, N, Val}} ->
  64. Vals2 = ?DICT:store(N, Val, Vals),
  65. case ?DICT:size(Vals2) == Size of
  66. true ->
  67. [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Vals2)) ];
  68. false ->
  69. collect_results(Size, Vals2)
  70. end
  71. end.
  72. run_local(Op) ->
  73. run_local(Op, []).
  74. run_local(Op, Args) ->
  75. run_local(Op, Args, self()).
  76. run_local(Op, Args, Receiver) ->
  77. case ebb_prim:is_operation(Op) andalso is_list(Args)
  78. andalso ebb_prim:in_arity(Op) == length(Args) of
  79. true -> case start_local(Op, Receiver) of
  80. {ok, Proc} -> send_args(Proc, Args);
  81. Err -> Err
  82. end;
  83. false -> erlang:error(badarg, [Op, Args])
  84. end.
  85. send_args(Proc, Args) ->
  86. lists:foldl(fun(Arg, N) -> ebb_event:in(Proc, N, Arg), N+1 end,
  87. 1, Args),
  88. ok.
  89. start_local(Func = #func{}, Receiver) ->
  90. ebb_func_fsm:start_link(Func, Receiver);
  91. start_local(Value = #value{}, Receiver) ->
  92. ebb_value_fsm:start_link(Value, Receiver);
  93. start_local(Pipe = #pipe{}, Receiver) ->
  94. ebb_pipe_fsm:start_link(Pipe, fun start_local/2, Receiver);
  95. start_local(Par = #par{}, Receiver) ->
  96. ebb_par_fsm:start_link(Par, fun start_local/2, Receiver);
  97. start_local(Route = #route{}, Receiver) ->
  98. ebb_route_fsm:start_link(Route, Receiver);
  99. start_local(Sync = #sync{}, Receiver) ->
  100. ebb_sync_fsm:start_link(Sync, Receiver);
  101. start_local(Split = #split{}, Receiver) ->
  102. ebb_split_fsm:start_link(Split, Receiver);
  103. start_local(Merge = #merge{}, Receiver) ->
  104. ebb_merge_fsm:start_link(Merge, Receiver);
  105. start_local(Switch = #switch{}, Receiver) ->
  106. ebb_switch_fsm:start_link(Switch, fun start_local/2, Receiver).
  107. %%%-----------------------------------------------------------------------------
  108. %%% Simplification
  109. %%%-----------------------------------------------------------------------------
  110. simpl(Pipe = #pipe{ops=Inside}) ->
  111. flatten(Pipe#pipe{ops=simpl_pipe([ simpl(O) || O <- Inside ])});
  112. simpl(Par = #par{ops=Inside}) ->
  113. flatten(Par#par{ops=simpl_par([ simpl(O) || O <- Inside ])});
  114. simpl(Switch = #switch{map=Map}) ->
  115. Switch#switch{map=[ {Tag, simpl(Op)} || {Tag, Op} <- Map ]};
  116. simpl(Op) ->
  117. case ebb_prim:is_operation(Op) of
  118. true -> Op;
  119. false -> erlang:error(badarg, [Op])
  120. end.
  121. simpl_pipe(Ops) ->
  122. lists:reverse(lists:foldl(fun simpl_pipe/2, [], Ops)).
  123. simpl_pipe(#pipe{ops=Ops}, Acc) ->
  124. lists:reverse(Ops, Acc);
  125. simpl_pipe(Par = #par{}, [Any | Acc]) ->
  126. simpl_pipe_par(Any, Par) ++ Acc;
  127. simpl_pipe(R2 = #route{in=N}, Acc) ->
  128. case {R2 == ebb_flow:id(N), Acc} of
  129. {true, _} -> Acc;
  130. {false, [R1 = #route{} | Rest]} ->
  131. R3 = #route{in=N3} = simpl_pipe_route(R1, R2),
  132. case R3 == ebb_flow:id(N3) of
  133. true -> Rest;
  134. false -> [R3 | Rest]
  135. end;
  136. {false, _} -> [R2 | Acc]
  137. end;
  138. simpl_pipe(#split{size=N}, [#merge{size=N} | Acc]) ->
  139. Acc;
  140. simpl_pipe(#merge{size=N}, [#split{size=N} | Acc]) ->
  141. Acc;
  142. simpl_pipe(Op, Acc) ->
  143. [Op | Acc].
  144. simpl_pipe_par(Par1 = #par{ops=Ops1}, Par2 = #par{ops=Ops2}) ->
  145. case zip_pars(Ops1, Ops2) of
  146. [{Ops1, Ops2}] -> [Par2, Par1];
  147. Zip -> [permute_pipe_par(Zip)]
  148. end;
  149. simpl_pipe_par(Any, Par = #par{ops=Ops}) ->
  150. case zip_pars([Any], Ops) of
  151. [{Any, Ops}] -> [Any, Par];
  152. Zip -> [permute_pipe_par(Zip)]
  153. end.
  154. permute_pipe_par(Zip) ->
  155. ebb_prim:par(
  156. lists:flatten(
  157. [ case {O1, O2} of
  158. {[_|_], [_|_]} ->
  159. [simpl(ebb_prim:pipe(
  160. [ flatten(ebb_prim:par(X))
  161. || X <- [O1, O2] ]))];
  162. {_, _} -> [O1, O2]
  163. end
  164. || {O1, O2} <- Zip ])).
  165. zip_pars(Par1, Par2) ->
  166. zip_pars(1, Par1, [], 1, Par2, [], []).
  167. zip_pars(_, _Par1 = [], Prev1,
  168. _, Par2, Prev2,
  169. Acc) ->
  170. lists:reverse([ {lists:reverse(Prev1), lists:reverse(Prev2, Par2)} | Acc ]);
  171. zip_pars(_, Par1, Prev1,
  172. _, _Par2 = [], Prev2,
  173. Acc) ->
  174. lists:reverse([ {lists:reverse(Prev1, Par1), lists:reverse(Prev2)} | Acc ]);
  175. zip_pars(M, _Par1 = [Op1|Rest1], Prev1,
  176. N, _Par2 = [Op2|Rest2], Prev2,
  177. Acc) ->
  178. case {M+ebb_prim:out_arity(Op1), N+ebb_prim:in_arity(Op2), Prev2} of
  179. {_, N, []} ->
  180. zip_pars(M, [Op1|Rest1], Prev1,
  181. N, Rest2, [],
  182. [ {[], Op2} | Acc ]);
  183. {I, I, _} ->
  184. zip_pars(I, Rest1, [],
  185. I, Rest2, [],
  186. [ {lists:reverse([Op1|Prev1]), lists:reverse([Op2|Prev2])}
  187. | Acc ]);
  188. {M2, N2, _} when M2 < N2 ->
  189. zip_pars(M2, Rest1, [Op1|Prev1],
  190. N, [Op2|Rest2], Prev2,
  191. Acc);
  192. {M2, N2, _} when N2 < M2 ->
  193. zip_pars(M, [Op1|Rest1], Prev1,
  194. N2, Rest2, [Op2|Prev2],
  195. Acc)
  196. end.
  197. simpl_pipe_route(#route{in=In, map=M1}, #route{map=M2}) ->
  198. ebb_prim:route(In, [ lists:nth(I, M1) || I <- M2 ]).
  199. simpl_par(Ops) ->
  200. lists:reverse(lists:foldl(fun simpl_par/2, [], Ops)).
  201. simpl_par(#par{ops=Ops}, Acc) ->
  202. lists:reverse(Ops, Acc);
  203. simpl_par(#route{in=0, out=0, map=[]}, Acc) ->
  204. Acc;
  205. simpl_par(R2 = #route{}, [R1 = #route{} | Acc]) ->
  206. [simpl_par_route(R1, R2) | Acc];
  207. simpl_par(Op, Acc) ->
  208. [Op | Acc].
  209. simpl_par_route(#route{in=In1, map=M1}, #route{in=In2, map=M2}) ->
  210. ebb_prim:route(In1+In2,
  211. lists:append(M1, [ S2+In1 || S2 <- M2 ])).
  212. flatten(#pipe{in=N, out=N, ops=[]}) ->
  213. ebb_flow:id(N);
  214. flatten(#pipe{ops=[Single]}) ->
  215. Single;
  216. flatten(#par{in=N, out=N, ops=[]}) ->
  217. ebb_flow:id(N);
  218. flatten(#par{ops=[Single]}) ->
  219. Single;
  220. flatten(Op) ->
  221. Op.