-module(flow_graph). -include("../include/flow_graph.hrl"). %%% Operation construction -export([func/1, value/1, values/1, dynamic/1, route/2, pipe/1, parallel/1, sequence/1, funnel/1, split/1, merge/1, switch/2, loop/2]). %%% Operation querying -export([in_arity/1, out_arity/1, can_connect/2]). %%%----------------------------------------------------------------------------- %%% Operation construction %%%----------------------------------------------------------------------------- func(F) when is_function(F) -> {arity, N} = erlang:fun_info(F, arity), #func{in=N, code=F}. value(X) -> #value{value=X}. values(Xs = [_|_]) -> parallel(lists:map(fun value/1, Xs)). dynamic(Gen) when is_function(Gen) -> {arity, N} = erlang:fun_info(Gen, arity), #dynamic{in=N, code=Gen}. route(N, Map) -> try lists:foldl(fun(Target, Count) when Target =< N -> Count+1; (Target, _Count) when Target > N -> throw(out_of_bounds) end, 0, Map) of Count -> #route{in=N, out=Count, map=Map} catch throw:out_of_bounds -> erlang:error(badarg, [N, Map]) end. pipe(Ops = [First|Rest]) -> try lists:foldl(fun(Op2, Op1) -> case can_connect(Op1, Op2) of true -> Op2; false -> throw(bad_connection) end end, First, Rest) of Last -> #pipe{in=in_arity(First), out=out_arity(Last), ops=Ops} catch throw:bad_connection -> erlang:error(badarg, [Ops]) end. parallel(Ops = [_|_]) -> {In, Out} = flatten_arity(Ops), #parallel{in=In, out=Out, ops=Ops}. sequence(Ops = [_|_]) -> {In, Out} = flatten_arity(Ops), #sequence{in=In, out=Out, ops=Ops}. split(N) -> #split{size=N}. merge(N) -> #merge{size=N}. funnel(N) -> #funnel{size=N}. switch(Switch, Map = [{_, Op1}|Rest]) -> try {I, O} = lists:foldl( fun({_, Op}, {In, Out}) -> known_op_arity(Op, In, Out) end, {in_arity(Op1), out_arity(Op1)}, Rest), known_op_arity(Switch, I, 1), {I, O} of {In, Out} -> #switch{in=In, out=Out, switch=Switch, map=Map} catch error:function_clause -> erlang:error(badarg, [Switch, Map]) end. loop(Init, Op) -> S = out_arity(Init), {In, Out} = case {in_arity(Op)-S, out_arity(Op)-S} of {I, O} when I >= 0, O > 0 -> {I+in_arity(Init), O}; {_, _} -> erlang:error(badarg, [Init, Op]) end, #loop{in=In, out=Out, init=Init, op=Op}. %%%----------------------------------------------------------------------------- %%% Operation querying %%%----------------------------------------------------------------------------- in_arity(#func{in=In}) -> In; in_arity(#value{}) -> 0; in_arity(#dynamic{in=In}) -> In; in_arity(#route{in=In}) -> In; in_arity(#pipe{in=In}) -> In; in_arity(#parallel{in=In}) -> In; in_arity(#sequence{in=In}) -> In; in_arity(#split{}) -> 1; in_arity(#merge{size=N}) -> N; in_arity(#funnel{size=N}) -> N; in_arity(#switch{in=In}) -> In; in_arity(#loop{in=In}) -> In. out_arity(#func{}) -> 1; out_arity(#value{}) -> 1; out_arity(#dynamic{}) -> unknown; out_arity(#route{out=Out}) -> Out; out_arity(#pipe{out=Out}) -> Out; out_arity(#parallel{out=Out}) -> Out; out_arity(#sequence{out=Out}) -> Out; out_arity(#split{size=N}) -> N; out_arity(#merge{}) -> 1; out_arity(#funnel{}) -> 1; out_arity(#switch{out=Out}) -> Out; out_arity(#loop{out=Out}) -> Out. flatten_arity([First|Rest]) -> lists:foldl(fun(Op, {In, Out}) -> {add_arity(In, in_arity(Op)), add_arity(Out, out_arity(Op))} end, {in_arity(First), out_arity(First)}, Rest). add_arity(unknown, _) -> unknown; add_arity(_, unknown) -> unknown; add_arity(N, M) -> N + M. known_arity(N, N) -> N; known_arity(unknown, N) -> N; known_arity(N, unknown) -> N. known_op_arity(Op, In, Out) -> {known_arity(in_arity(Op), In), known_arity(out_arity(Op), Out)}. can_connect(Op1, Op2) -> case {out_arity(Op1), in_arity(Op2)} of {N, N} -> true; {unknown, _} -> true; _ -> false end.