123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- -module(ebb_flow).
- %%% Smart constructors
- -export([func/1, func/2,
- value/1, values/1,
- pipe/1, pipe/2,
- par/1, par/2,
- route/2,
- sync/1, sync/2,
- split/1, split/2,
- merge/1, merge/2,
- switch/1]).
- -export([to_op/1, par_pipes/1, pipe_pars/1]).
- %%% Convenience operations
- -export([id/0, id/1, nop/0, select/2]).
- %%% Parallelism
- -export([fanout/1, fanout/2,
- map/2, map_split/3,
- reduce/2, reduce/3, reduce_split/3,
- map_reduce/3, map_reduce/4, map_reduce_split/4]).
- %%% Sequencing
- -export([seq/1, seq/2, ignore/1]).
- %%% Choice
- -export([cases/1, case_of/2,
- iff/2, if_else/3,
- fanin/1, fanin/2]).
- %%%-----------------------------------------------------------------------------
- %%% Smart constructors
- %%%-----------------------------------------------------------------------------
- value(X) ->
- ebb_prim:value(X).
- values(Xs) ->
- ebb_prim:par([ ebb_prim:value(X) || X <- Xs ]).
- func(F) ->
- ebb_prim:func(F).
- func(F, N) ->
- ebb_prim:pipe([ebb_prim:func(F), ebb_prim:split(N)]).
- pipe(Xs) ->
- case {ebb_prim:is_operation(Xs), is_list(Xs), is_tuple(Xs)} of
- {true, _, _} -> Xs;
- {_, true, _} -> ebb_prim:pipe([ to_op(X) || X <- Xs ]);
- {_, _, true} -> ebb_prim:pipe([ to_op(X) || X <- tuple_to_list(Xs) ]);
- {_, _, _} -> to_op(Xs)
- end.
- pipe(X, Y) ->
- pipe([X, Y]).
- par(Xs) ->
- case {ebb_prim:is_operation(Xs), is_list(Xs), is_tuple(Xs)} of
- {true, _, _} -> Xs;
- {_, true, _} -> ebb_prim:par([ to_op(X) || X <- Xs ]);
- {_, _, true} -> ebb_prim:par([ to_op(X) || X <- tuple_to_list(Xs) ]);
- {_, _, _} -> to_op(Xs)
- end.
- par(X, Y) ->
- par([X, Y]).
- route(N, Map) ->
- ebb_prim:route(N, Map).
- sync(N) ->
- ebb_prim:sync(N).
- sync(X1, X2) ->
- Op1 = to_op(X1),
- Op2 = to_op(X2),
- ebb_prim:pipe([Op1, sync(ebb_prim:out_arity(Op1)), Op2]).
- split(N) ->
- ebb_prim:split(N).
- split(X1, X2) ->
- Op1 = to_op(X1),
- Op2 = to_op(X2),
- ebb_prim:pipe([Op1, split(ebb_prim:in_arity(Op2)), Op2]).
- merge(N) ->
- ebb_prim:merge(N).
- merge(X1, X2) ->
- Op1 = to_op(X1),
- Op2 = to_op(X2),
- ebb_prim:pipe([Op1, merge(ebb_prim:out_arity(Op1)), Op2]).
- switch(Branches) ->
- ebb_prim:switch([ {Tag, to_op(X)} || {Tag, X} <- Branches ]).
- to_op(X) ->
- case {ebb_prim:is_operation(X), is_function(X), is_list(X), is_tuple(X)} of
- {true, _, _, _} -> X;
- {_, true, _, _} -> ebb_prim:func(X);
- {_, _, true, _} -> ebb_prim:par([ to_op(Y) || Y <- X ]);
- {_, _, _, true} -> ebb_prim:par([ to_op(Y) || Y <- tuple_to_list(X) ]);
- {_, _, _, _} -> ebb_prim:value(X)
- end.
- par_pipes(Args) ->
- ebb_prim:par([ nested_operation(X, fun pipe_pars/1) || X <- Args ]).
- pipe_pars(Args) ->
- ebb_prim:pipe([ nested_operation(X, fun par_pipes/1) || X <- Args ]).
- nested_operation(X, Continue) ->
- case {ebb_prim:is_operation(X), is_function(X), is_list(X), is_tuple(X)} of
- {true, _, _, _} -> X;
- {_, true, _, _} -> ebb_prim:func(X);
- {_, _, true, _} -> Continue(X);
- {_, _, _, true} -> Continue(tuple_to_list(X));
- {_, _, _, _} -> ebb_prim:value(X)
- end.
- %%%-----------------------------------------------------------------------------
- %%% Convenience operations
- %%%-----------------------------------------------------------------------------
- id() ->
- route(1, [1]).
- id(N) ->
- route(N, lists:seq(1,N)).
- nop() ->
- value('nop').
- select(I, N) ->
- route(N, [I]).
- %%%-----------------------------------------------------------------------------
- %%% Parallelism
- %%%-----------------------------------------------------------------------------
- fanout(Copies) ->
- fanout(1, Copies).
- fanout(In, Copies) ->
- Out = Copies*In,
- route(In, [ (I rem In) + 1 || I <- lists:seq(0, Out-1) ]).
- map(Func, List) ->
- FOp = to_op(Func),
- LOp = to_op(List),
- FIn = ebb_prim:in_arity(FOp),
- LOut = ebb_prim:out_arity(LOp),
- case {LOut div FIn, LOut rem FIn} of
- {N, 0} -> pipe(LOp, par(lists:duplicate(N, FOp)));
- {_, _} -> erlang:error(badarg, [Func, List])
- end.
- map_split(N, Func, List) ->
- map(Func, pipe(to_op(List), split(N))).
- reduce(Func, List) ->
- try
- reduce_right(to_op(Func), to_op(List))
- catch
- throw:badarg -> erlang:error(badarg, [Func, List])
- end.
- reduce(Func, Z, List) ->
- try
- ZOp = to_op(Z),
- case ebb_prim:in_arity(ZOp) of
- 0 -> ok;
- _ -> throw(badarg)
- end,
- reduce_right(to_op(Func), ZOp, to_op(List))
- catch
- throw:badarg -> erlang:error(badarg, [Func, List])
- end.
- reduce_split(N, Func, List) ->
- reduce(Func, pipe(to_op(List), split(N))).
- reduce_right(FOp, LOp) ->
- FIn = ebb_prim:in_arity(FOp),
- LOut = ebb_prim:out_arity(LOp),
- case {LOut, LOut div FIn, LOut rem FIn} of
- {0, 0, _} -> LOp;
- {_, 0, _} -> throw(badarg);
- {_, 1, 0} -> pipe(LOp, FOp);
- {_, N, 0} -> reduce_right(
- FOp, pipe(LOp, par(lists:duplicate(N, FOp))));
- {_, N, M} -> reduce_right(
- FOp, pipe(LOp, par([id(M) | lists:duplicate(N, FOp)])))
- end.
- reduce_right(FOp, ZOp, LOp) ->
- FIn = ebb_prim:in_arity(FOp),
- LOut = ebb_prim:out_arity(LOp),
- ZOut = ebb_prim:out_arity(ZOp),
- N = LOut div FIn,
- Rem = LOut rem FIn,
- Need = FIn - Rem,
- case {LOut, N, Rem, Need div ZOut, Need rem ZOut} of
- {0, 0, _, _, _} -> LOp;
- {_, 0, _, Z, 0} -> pipe(par([LOp | lists:duplicate(Z, ZOp)]), FOp);
- {_, 1, 0, _, _} -> pipe(LOp, FOp);
- {_, N, 0, _, _} -> reduce_right(
- FOp, ZOp,
- pipe(LOp, par(lists:duplicate(N, FOp))));
- {_, N, _, Z, 0} -> reduce_right(
- FOp, ZOp,
- pipe(par([LOp | lists:duplicate(Z, ZOp)]),
- par(lists:duplicate(N+1, FOp))));
- {_, _, _, _, _} -> throw(badarg)
- end.
- map_reduce(Map, Red, List) ->
- reduce(Red, map(Map, List)).
- map_reduce(Map, Red, Z, List) ->
- reduce(Red, Z, map(Map, List)).
- map_reduce_split(N, Map, Red, List) ->
- reduce(Red, map_split(N, Map, List)).
- %%%-----------------------------------------------------------------------------
- %%% Sequencing
- %%%-----------------------------------------------------------------------------
- seq(Xs = [_|_]) ->
- Ops = [Op1|Rest] = [ to_op(X) || X <- Xs ],
- {In, _} = ebb_prim:flatten_arity(Ops),
- Produced1 = ebb_prim:out_arity(Op1),
- Remaining1 = In-ebb_prim:in_arity(Op1),
- {Seq, _, _} =
- lists:foldl(fun(Op, {Seq,Produced,Remaining}) ->
- Produced2 = Produced+ebb_prim:out_arity(Op),
- Remaining2 = Remaining-ebb_prim:in_arity(Op),
- {sync(Seq, par([id(Produced),
- Op,
- id(Remaining2)])),
- Produced2, Remaining2}
- end,
- {par(Op1, id(Remaining1)),
- Produced1, Remaining1},
- Rest),
- Seq;
- seq(X) ->
- to_op(X).
- seq(X, Y) ->
- seq([X, Y]).
- ignore(N) ->
- ebb_prim:route(N, []).
- %%%-----------------------------------------------------------------------------
- %%% Choice
- %%%-----------------------------------------------------------------------------
- cases(Branches) ->
- switch(Branches).
- case_of(Disc, Branches) ->
- Cases = cases(Branches),
- pipe_pars([[to_op(Disc), id(ebb_prim:in_arity(Cases)-1)],
- Cases]).
- iff(Then, Else) ->
- cases([{true, Then},
- {false, Else}]).
- if_else(Test, Then, Else) ->
- case_of(Test, [{true, Then},
- {false, Else}]).
- fanin(Copies) ->
- cases([ {I , select(I, Copies)} || I <- lists:seq(1, Copies) ]).
- fanin(Copies, Out) ->
- In = Copies*Out,
- cases([ {I, ebb_flow:route(In, [ (I-1)*Out + J || J <- lists:seq(1, Out)])}
- || I <- lists:seq(1, Copies) ]).
|