-module(ebb_flow). %%% Smart constructors -export([func/1, func/2, value/1, values/1, pipe/1, pipe/2, par/1, par/2, route/2, sync/1, sync/2, split/1, split/2, merge/1, merge/2, switch/1]). -export([to_op/1, par_pipes/1, pipe_pars/1]). %%% Convenience operations -export([id/0, id/1, nop/0, select/2]). %%% Parallelism -export([fanout/1, fanout/2, map/2, map_split/3, reduce/2, reduce/3, reduce_split/3, map_reduce/3, map_reduce/4, map_reduce_split/4]). %%% Sequencing -export([seq/1, seq/2, ignore/1]). %%% Choice -export([cases/1, case_of/2, iff/2, if_else/3, fanin/1, fanin/2]). %%%----------------------------------------------------------------------------- %%% Smart constructors %%%----------------------------------------------------------------------------- value(X) -> ebb_prim:value(X). values(Xs) -> ebb_prim:par([ ebb_prim:value(X) || X <- Xs ]). func(F) -> ebb_prim:func(F). func(F, N) -> ebb_prim:pipe([ebb_prim:func(F), ebb_prim:split(N)]). pipe(Xs) -> case {ebb_prim:is_operation(Xs), is_list(Xs), is_tuple(Xs)} of {true, _, _} -> Xs; {_, true, _} -> ebb_prim:pipe([ to_op(X) || X <- Xs ]); {_, _, true} -> ebb_prim:pipe([ to_op(X) || X <- tuple_to_list(Xs) ]); {_, _, _} -> to_op(Xs) end. pipe(X, Y) -> pipe([X, Y]). par(Xs) -> case {ebb_prim:is_operation(Xs), is_list(Xs), is_tuple(Xs)} of {true, _, _} -> Xs; {_, true, _} -> ebb_prim:par([ to_op(X) || X <- Xs ]); {_, _, true} -> ebb_prim:par([ to_op(X) || X <- tuple_to_list(Xs) ]); {_, _, _} -> to_op(Xs) end. par(X, Y) -> par([X, Y]). route(N, Map) -> ebb_prim:route(N, Map). sync(N) -> ebb_prim:sync(N). sync(X1, X2) -> Op1 = to_op(X1), Op2 = to_op(X2), ebb_prim:pipe([Op1, sync(ebb_prim:out_arity(Op1)), Op2]). split(N) -> ebb_prim:split(N). split(X1, X2) -> Op1 = to_op(X1), Op2 = to_op(X2), ebb_prim:pipe([Op1, split(ebb_prim:in_arity(Op2)), Op2]). merge(N) -> ebb_prim:merge(N). merge(X1, X2) -> Op1 = to_op(X1), Op2 = to_op(X2), ebb_prim:pipe([Op1, merge(ebb_prim:out_arity(Op1)), Op2]). switch(Branches) -> ebb_prim:switch([ {Tag, to_op(X)} || {Tag, X} <- Branches ]). to_op(X) -> case {ebb_prim:is_operation(X), is_function(X), is_list(X), is_tuple(X)} of {true, _, _, _} -> X; {_, true, _, _} -> ebb_prim:func(X); {_, _, true, _} -> ebb_prim:par([ to_op(Y) || Y <- X ]); {_, _, _, true} -> ebb_prim:par([ to_op(Y) || Y <- tuple_to_list(X) ]); {_, _, _, _} -> ebb_prim:value(X) end. par_pipes(Args) -> ebb_prim:par([ nested_operation(X, fun pipe_pars/1) || X <- Args ]). pipe_pars(Args) -> ebb_prim:pipe([ nested_operation(X, fun par_pipes/1) || X <- Args ]). nested_operation(X, Continue) -> case {ebb_prim:is_operation(X), is_function(X), is_list(X), is_tuple(X)} of {true, _, _, _} -> X; {_, true, _, _} -> ebb_prim:func(X); {_, _, true, _} -> Continue(X); {_, _, _, true} -> Continue(tuple_to_list(X)); {_, _, _, _} -> ebb_prim:value(X) end. %%%----------------------------------------------------------------------------- %%% Convenience operations %%%----------------------------------------------------------------------------- id() -> route(1, [1]). id(N) -> route(N, lists:seq(1,N)). nop() -> value('nop'). select(I, N) -> route(N, [I]). %%%----------------------------------------------------------------------------- %%% Parallelism %%%----------------------------------------------------------------------------- fanout(Copies) -> fanout(1, Copies). fanout(In, Copies) -> Out = Copies*In, route(In, [ (I rem In) + 1 || I <- lists:seq(0, Out-1) ]). map(Func, List) -> FOp = to_op(Func), LOp = to_op(List), FIn = ebb_prim:in_arity(FOp), LOut = ebb_prim:out_arity(LOp), case {LOut div FIn, LOut rem FIn} of {N, 0} -> pipe(LOp, par(lists:duplicate(N, FOp))); {_, _} -> erlang:error(badarg, [Func, List]) end. map_split(N, Func, List) -> map(Func, pipe(to_op(List), split(N))). reduce(Func, List) -> try reduce_right(to_op(Func), to_op(List)) catch throw:badarg -> erlang:error(badarg, [Func, List]) end. reduce(Func, Z, List) -> try ZOp = to_op(Z), case ebb_prim:in_arity(ZOp) of 0 -> ok; _ -> throw(badarg) end, reduce_right(to_op(Func), ZOp, to_op(List)) catch throw:badarg -> erlang:error(badarg, [Func, List]) end. reduce_split(N, Func, List) -> reduce(Func, pipe(to_op(List), split(N))). reduce_right(FOp, LOp) -> FIn = ebb_prim:in_arity(FOp), LOut = ebb_prim:out_arity(LOp), case {LOut, LOut div FIn, LOut rem FIn} of {0, 0, _} -> LOp; {_, 0, _} -> throw(badarg); {_, 1, 0} -> pipe(LOp, FOp); {_, N, 0} -> reduce_right( FOp, pipe(LOp, par(lists:duplicate(N, FOp)))); {_, N, M} -> reduce_right( FOp, pipe(LOp, par([id(M) | lists:duplicate(N, FOp)]))) end. reduce_right(FOp, ZOp, LOp) -> FIn = ebb_prim:in_arity(FOp), LOut = ebb_prim:out_arity(LOp), ZOut = ebb_prim:out_arity(ZOp), N = LOut div FIn, Rem = LOut rem FIn, Need = FIn - Rem, case {LOut, N, Rem, Need div ZOut, Need rem ZOut} of {0, 0, _, _, _} -> LOp; {_, 0, _, Z, 0} -> pipe(par([LOp | lists:duplicate(Z, ZOp)]), FOp); {_, 1, 0, _, _} -> pipe(LOp, FOp); {_, N, 0, _, _} -> reduce_right( FOp, ZOp, pipe(LOp, par(lists:duplicate(N, FOp)))); {_, N, _, Z, 0} -> reduce_right( FOp, ZOp, pipe(par([LOp | lists:duplicate(Z, ZOp)]), par(lists:duplicate(N+1, FOp)))); {_, _, _, _, _} -> throw(badarg) end. map_reduce(Map, Red, List) -> reduce(Red, map(Map, List)). map_reduce(Map, Red, Z, List) -> reduce(Red, Z, map(Map, List)). map_reduce_split(N, Map, Red, List) -> reduce(Red, map_split(N, Map, List)). %%%----------------------------------------------------------------------------- %%% Sequencing %%%----------------------------------------------------------------------------- seq(Xs = [_|_]) -> Ops = [Op1|Rest] = [ to_op(X) || X <- Xs ], {In, _} = ebb_prim:flatten_arity(Ops), Produced1 = ebb_prim:out_arity(Op1), Remaining1 = In-ebb_prim:in_arity(Op1), {Seq, _, _} = lists:foldl(fun(Op, {Seq,Produced,Remaining}) -> Produced2 = Produced+ebb_prim:out_arity(Op), Remaining2 = Remaining-ebb_prim:in_arity(Op), {sync(Seq, par([id(Produced), Op, id(Remaining2)])), Produced2, Remaining2} end, {par(Op1, id(Remaining1)), Produced1, Remaining1}, Rest), Seq; seq(X) -> to_op(X). seq(X, Y) -> seq([X, Y]). ignore(N) -> ebb_prim:route(N, []). %%%----------------------------------------------------------------------------- %%% Choice %%%----------------------------------------------------------------------------- cases(Branches) -> switch(Branches). case_of(Disc, Branches) -> Cases = cases(Branches), pipe_pars([[to_op(Disc), id(ebb_prim:in_arity(Cases)-1)], Cases]). iff(Then, Else) -> cases([{true, Then}, {false, Else}]). if_else(Test, Then, Else) -> case_of(Test, [{true, Then}, {false, Else}]). fanin(Copies) -> cases([ {I , select(I, Copies)} || I <- lists:seq(1, Copies) ]). fanin(Copies, Out) -> In = Copies*Out, cases([ {I, ebb_flow:route(In, [ (I-1)*Out + J || J <- lists:seq(1, Out)])} || I <- lists:seq(1, Copies) ]).