Sfoglia il codice sorgente

Merge branch 'master' of github.com:aisamanra/ebb

getty 13 anni fa
parent
commit
23e5bb0b1e
6 ha cambiato i file con 627 aggiunte e 53 eliminazioni
  1. 0 5
      include/ebb_pipe.hrl
  2. 63 4
      include/ebb_prim.hrl
  3. 261 0
      src/ebb_flow.erl
  4. 0 22
      src/ebb_pipe.erl
  5. 118 22
      src/ebb_prim.erl
  6. 185 0
      src/ebb_run.erl

+ 0 - 5
include/ebb_pipe.hrl

@@ -1,5 +0,0 @@
--record(pipe, {ops}).
-
--record(feed, {task, op}).
-
--record(sequence, {tasks}).

+ 63 - 4
include/ebb_prim.hrl

@@ -1,7 +1,66 @@
--record(thunk, {code}).
+%% operation(A, B) = A |->| B
+%% task(B) = operation((), B) = () |->| B = |<B>|
 
--record(func, {code}).
+%% operation(A, B) ::= func_F(A, B) 
+%%                   | value_X(A, B)
+%%                   | pipe_Op1,...,Op_I(A, B)
+%%                   | par_Op1,...,Op_I(A, B)
+%%                   | route_N,M,R(A, B)
+%%                   | sync_N(A, B)
+%%                   | split_N(A, B)
+%%                   | merge_N(A, B)
+%%                   | switch_I,N(A, B)
 
--record(dynamic, {code}).
+%% func_F(A_1 * ... * A_n, B) ::= { in = arity(F) : number, code = F }
+%%   where F : fun(A_1, ..., A_n) -> B
+-record(func, {in, code}).
 
--record(return, {val}).
+%% value_X((), B) ::= { value = X }
+%%   where X : B
+-record(value, {value}).
+
+%% pipe_Op_1,Op_2,...,Op_I(A_1_1 * ... * A_1_N1, B_1 * ... * B_M) ::=
+%%   { in = N1 : number, out = M : number,
+%%     ops : [ Op_1, Op_2, ..., Op_i ] }
+%%   where Op_1 : A_1_1 * ... * A_1_N1 |->| A_2_1 * ... * A_2_N2
+%%         Op_2 : A_2_1 * ... * A_2_N2 |->| A_3_1 * ... * A_3_N3
+%%         ...
+%%         Op_I : A_I_1 * ... * A_I_NI |->| B_1 * ... * B_M
+-record(pipe, {in, out, ops}).
+
+%% par_Op_1,Op2_,...,Op_I(A, B) ::=
+%%   { in = size(A) : number,
+%%     out = size(B) : number,
+%%     ops : [ Op_1, Op_2, ..., Op_I ] }
+%%   where Op_1 : A_1 |->| B_1
+%%         Op_2 : A_2 |->| B_2
+%%         ...
+%%         Op_I : A_I |->| B_I
+%%         A = flatten([A_1, A_2, ..., A_I])
+%%         B = flatten([B_1, B_2, ..., B_I])
+-record(par, {in, out, ops}).
+
+%% route_N,R(A, [ A_I || I <- R ]) ::=
+%%   { in = N, out = length(R), map = R }
+%%   where N : number
+%%         R : [source]
+%%         source : [1..N]
+-record(route, {in, out, map}).
+
+%% sync_N(A_1 * ... * A_N, A_1 * ... A_N) ::= { size = N : number }
+-record(sync, {size}).
+
+%% split_N(unit(A_1 * ... * A_N), A_1 * ... * A_N) ::= { size = N : number }
+-record(split, {size}).
+
+%% merge_N(A_1 * ... * A_N, unit(A_1 * ... * A_N)) ::= { size = N : number }
+-record(merge, {size}).
+
+%% switch_Op_1,...,Op_I((T_1 + ... + T_I) * A_1 * ... * A_N, 
+%%                      (B_1_1 + ... + B_I_1) * ... * (B_1_M + ... + B_I_M)) ::=
+%%   { in = 1+N : number, out = M : number,
+%%     map    : [ {T_J, Op_J} || J <- [1..I] ] }
+%%   where Op_1 : A_1 * ... * A_N |->| B_1_1 * ... * B_1_M
+%%         ...
+%%         Op_I : A_1 * ... * A_N |->| B_I_1 * ... * B_I_M
+-record(switch, {in, out, map}).

+ 261 - 0
src/ebb_flow.erl

@@ -0,0 +1,261 @@
+-module(ebb_flow).
+
+%%% Smart constructors
+-export([func/1, func/2,
+	 value/1, values/1,
+	 pipe/1, pipe/2,
+	 par/1, par/2,
+	 route/2,
+	 sync/1, sync/2,
+	 split/1, split/2,
+	 merge/1, merge/2,
+	 switch/1]).
+-export([to_op/1, par_pipes/1, pipe_pars/1]).
+
+%%% Convenience operations
+-export([id/0, id/1, nop/0, select/2]).
+
+%%% Parallelism
+-export([fanout/1, fanout/2,
+	 map/2, map/3,
+	 reduce/2, reduce/3,
+	 map_reduce/3, map_reduce/4]).
+
+%%% Sequencing
+-export([seq/1, seq/2]).
+
+%%% Choice
+-export([cases/1, case_of/2,
+	 iff/2, if_else/3,
+	 fanin/1, fanin/2]).
+
+%%%-----------------------------------------------------------------------------
+%%% Smart constructors
+%%%-----------------------------------------------------------------------------
+
+value(X) ->
+    ebb_prim:value(X).
+
+values(Xs) ->
+    ebb_prim:par([ ebb_prim:value(X) || X <- Xs ]).
+
+func(F) ->
+    ebb_prim:func(F).
+
+func(F, N) ->
+    ebb_prim:pipe([ebb_prim:func(F), ebb_prim:split(N)]).
+
+pipe(Xs) ->
+    case {ebb_prim:is_operation(Xs), is_list(Xs), is_tuple(Xs)} of
+	{true, _, _} -> Xs;
+	{_, true, _} -> ebb_prim:pipe([ to_op(X) || X <- Xs ]);
+	{_, _, true} -> ebb_prim:pipe([ to_op(X) || X <- tuple_to_list(Xs) ]);
+	{_, _, _}    -> to_op(Xs)
+    end.
+
+pipe(X, Y) ->
+    pipe([X, Y]).
+
+par(Xs) ->
+    case {ebb_prim:is_operation(Xs), is_list(Xs), is_tuple(Xs)} of
+	{true, _, _} -> Xs;
+	{_, true, _} -> ebb_prim:par([ to_op(X) || X <- Xs ]);
+	{_, _, true} -> ebb_prim:par([ to_op(X) || X <- tuple_to_list(Xs) ]);
+	{_, _, _}    -> to_op(Xs)
+    end.
+
+par(X, Y) ->
+    par([X, Y]).
+
+route(N, Map) ->
+    ebb_prim:route(N, Map).
+
+sync(N) ->
+    ebb_prim:sync(N).
+
+sync(X1, X2) ->
+    Op1 = to_op(X1),
+    Op2 = to_op(X2),
+    ebb_prim:pipe([Op1, sync(ebb_prim:out_arity(Op1)), Op2]).
+
+split(N) ->
+    ebb_prim:split(N).
+
+split(X1, X2) ->
+    Op1 = to_op(X1),
+    Op2 = to_op(X2),
+    ebb_prim:pipe([Op1, split(ebb_prim:in_arity(Op2)), Op2]).
+
+merge(N) ->
+    ebb_prim:merge(N).
+
+merge(X1, X2) ->
+    Op1 = to_op(X1),
+    Op2 = to_op(X2),
+    ebb_prim:pipe([Op1, merge(ebb_prim:out_arity(Op1)), Op2]).
+
+switch(Branches) ->
+    ebb_prim:switch([ {Tag, to_op(X)} || {Tag, X} <- Branches ]).
+
+to_op(X) ->
+    case {ebb_prim:is_operation(X), is_function(X), is_list(X), is_tuple(X)} of
+	{true, _, _, _} -> X;
+	{_, true, _, _} -> ebb_prim:func(X);
+	{_, _, true, _} -> ebb_prim:par([ to_op(Y) || Y <- X ]);
+	{_, _, _, true} -> ebb_prim:par([ to_op(Y) || Y <- tuple_to_list(X) ]);
+	{_, _, _, _}    -> ebb_prim:value(X)
+    end.
+
+par_pipes(Args) ->
+    ebb_prim:par([ nested_operation(X, fun pipe_pars/1) || X <- Args ]).
+
+pipe_pars(Args) ->
+    ebb_prim:pipe([ nested_operation(X, fun par_pipes/1) || X <- Args ]).
+
+nested_operation(X, Continue) ->
+    case {ebb_prim:is_operation(X), is_function(X), is_list(X), is_tuple(X)} of
+	{true, _, _, _} -> X;
+	{_, true, _, _} -> ebb_prim:func(X);
+	{_, _, true, _} -> Continue(X);
+	{_, _, _, true} -> Continue(tuple_to_list(X));
+	{_, _, _, _}    -> ebb_prim:value(X)
+    end.
+
+%%%-----------------------------------------------------------------------------
+%%% Convenience operations
+%%%-----------------------------------------------------------------------------
+
+id() ->
+    route(1, [1]).
+
+id(N) ->
+    route(N, lists:seq(1,N)).
+
+nop() ->
+    value('nop').
+
+select(I, N) ->
+    route(N, [I]).
+
+%%%-----------------------------------------------------------------------------
+%%% Parallelism
+%%%-----------------------------------------------------------------------------
+
+fanout(Copies) ->
+    fanout(1, Copies).
+
+fanout(In, Copies) ->
+    Out = Copies*In,
+    route(In, [ (I rem In) + 1 || I <- lists:seq(0, Out-1) ]).
+
+map(Func, List) ->
+    FOp = to_op(Func),
+    LOp = to_op(List),
+    FIn = ebb_prim:in_arity(FOp),
+    LOut = ebb_prim:out_arity(LOp),
+    case {LOut div FIn, LOut rem FIn} of
+	{N, 0} -> pipe(LOp, par(lists:duplicate(N, FOp)));
+	{_, _} -> erlang:error(badarg, [Func, List])
+    end.
+
+map(N, Func, List) ->
+    map(Func, pipe(to_op(List), split(N))).
+
+reduce(Func, List) ->
+    try
+	reduce_left(to_op(Func), to_op(List))
+    catch
+	throw:badarg -> erlang:error(badarg, [Func, List])
+    end.
+
+reduce(N, Func, List) ->
+    reduce(Func, pipe(to_op(List), split(N))).
+
+reduce_left(FOp, LOp) ->
+    FIn = ebb_prim:in_arity(FOp),
+    LOut = ebb_prim:out_arity(LOp),
+    case {LOut, LOut div FIn, LOut rem FIn} of
+	{0, 0, _} -> LOp;
+	{_, 0, _} -> throw(badarg);
+	{_, 1, 0} -> pipe(LOp, FOp);
+	{_, N, 0} -> reduce_right(
+		       FOp, pipe(LOp, par(lists:duplicate(N, FOp))));
+	{_, N, M} -> reduce_right(
+		       FOp, pipe(LOp, par(lists:append(lists:duplicate(N, FOp),
+						       [id(M)]))))
+    end.
+
+reduce_right(FOp, LOp) ->
+    FIn = ebb_prim:in_arity(FOp),
+    LOut = ebb_prim:out_arity(LOp),
+    case {LOut, LOut div FIn, LOut rem FIn} of
+	{0, 0, _} -> LOp;
+	{_, 0, _} -> throw(badarg);
+	{_, 1, 0} -> pipe(LOp, FOp);
+	{_, N, 0} -> reduce_left(
+		       FOp, pipe(LOp, par(lists:duplicate(N, FOp))));
+	{_, N, M} -> reduce_left(
+		       FOp, pipe(LOp, par([id(M) | lists:duplicate(N, FOp)])))
+    end.
+
+map_reduce(Map, Red, List) ->
+    reduce(Red, map(Map, List)).
+
+map_reduce(N, Map, Red, List) ->
+    reduce(Red, map(N, Map, List)).
+
+%%%-----------------------------------------------------------------------------
+%%% Sequencing
+%%%-----------------------------------------------------------------------------
+
+seq(Xs = [_|_]) ->
+    Ops = [Op1|Rest] = [ to_op(X) || X <- Xs ],
+    {In, _} = ebb_prim:flatten_arity(Ops),
+    Produced1 = ebb_prim:out_arity(Op1),
+    Remaining1 = In-ebb_prim:in_arity(Op1),
+    {Seq, _, _} =
+	lists:foldl(fun(Op, {Seq,Produced,Remaining}) ->
+			    Produced2 = Produced+ebb_prim:out_arity(Op),
+			    Remaining2 = Remaining-ebb_prim:in_arity(Op),
+			    {sync(Seq, par([id(Produced),
+					    Op,
+					    id(Remaining2)])),
+			     Produced2, Remaining2}
+		    end,
+		    {par(Op1, id(Remaining1)),
+		     Produced1, Remaining1},
+		    Rest),
+    Seq;
+seq(X) ->
+    to_op(X).
+
+seq(X, Y) ->
+    seq([X, Y]).
+
+%%%-----------------------------------------------------------------------------
+%%% Choice
+%%%-----------------------------------------------------------------------------
+
+cases(Branches) ->
+    switch(Branches).
+
+case_of(Disc, Branches) ->
+    Cases = cases(Branches),
+    pipe_pars([[to_op(Disc), id(ebb_prim:in_arity(Cases)-1)],
+	       Cases]).
+
+iff(Then, Else) ->
+    cases([{true, Then},
+	   {false, Else}]).
+
+if_else(Test, Then, Else) ->
+    case_of(Test, [{true, Then},
+		   {false, Else}]).
+
+fanin(Copies) ->
+    cases([ {I , select(I, Copies)} || I <- lists:seq(1, Copies) ]).
+
+fanin(Copies, Out) ->
+    In = Copies*Out,
+    cases([ {I, ebb_flow:route(In, [ (I-1)*Out + J || J <- lists:seq(1, Out)])}
+	    || I <- lists:seq(1, Copies) ]).

+ 0 - 22
src/ebb_pipe.erl

@@ -1,22 +0,0 @@
--module(ebb_pipe).
--include("../include/ebb_prim.hrl").
--include("../include/ebb_pipe.hrl").
--export([pipe/2, pipe/1, bind/2, feed/2, sequence/2, sequence/1]).
-
-pipe(Op1, Op2) ->
-    #pipe{ops=[Op1, Op2]}.
-
-pipe(Ops) ->
-    #pipe{ops=Ops}.
-
-bind(Task, FuncT) ->
-    feed(Task, #dynamic{code=FuncT}).
-
-feed(Task, Op) ->
-    #feed{task=Task, op=Op}.
-
-sequence(Task1, Task2) ->
-    #sequence{tasks=[Task1, Task2]}.
-
-sequence(Tasks) ->
-    #sequence{tasks=Tasks}.

+ 118 - 22
src/ebb_prim.erl

@@ -1,33 +1,129 @@
 -module(ebb_prim).
 -include("../include/ebb_prim.hrl").
--include("../include/ebb_pipe.hrl").
--export([task/2, task/1, return/1,
-	 func/1, app/2, dynamic/1,
-	 ignore/1, nop/0, id/0]).
 
-task(Func, Arg) ->
-    apply(func(Func), Arg).
+%%% Operation construction
+-export([func/1, value/1,
+	 pipe/1, par/1,
+	 route/2, sync/1,
+	 split/1, merge/1,
+	 switch/1]).
 
-task(Thunk) ->
-    #thunk{code=Thunk}.
+%%% Operation querying
+-export([is_operation/1,
+	 in_arity/1, out_arity/1,
+	 flatten_arity/1,
+	 can_connect/2]).
 
-return(Val) ->
-    #return{val=Val}.
+%%%-----------------------------------------------------------------------------
+%%% Operation construction
+%%%-----------------------------------------------------------------------------
 
-func(Func) ->
-    #func{code=Func}.
+func(F) when is_function(F) ->
+    {arity, N} = erlang:fun_info(F, arity),
+    #func{in=N, code=F}.
 
-app(Op, Arg) ->
-    #feed{task=return(Arg), op=Op}.
+value(X) -> #value{value=X}.
 
-dynamic(FuncT) ->
-    #dynamic{code=FuncT}.
+pipe(Ops = [First|Rest]) ->
+    try 
+	lists:foldl(fun(Op2, Op1) -> case is_operation(Op2) andalso
+					 can_connect(Op1, Op2) of
+					 true  -> Op2;
+					 false -> throw(badarg)
+				     end
+		    end,
+		    case is_operation(First) of
+			true  -> First;
+			false -> throw(badarg)
+		    end,
+		    Rest)
+    of
+	Last -> #pipe{in=in_arity(First), out=out_arity(Last), ops=Ops}
+    catch
+	throw:badarg -> erlang:error(badarg, [Ops])
+    end.
 
-ignore(Task) ->
-    dynamic(fun(_) -> Task end).
+par(Ops = [_|_]) ->
+    case lists:all(fun is_operation/1, Ops) of
+	true  -> ok;
+	false -> erlang:error(badarg, [Ops])
+    end,
+    {In, Out} = flatten_arity(Ops),
+    #par{in=In, out=Out, ops=Ops}.
 
-nop() ->
-    #return{val='nop'}.
+route(N, Map) when is_number(N) ->
+    Good = lists:all(fun(Source) ->is_number(Source) andalso
+				       Source > 0 andalso Source =< N
+		     end,
+		     Map),
+    case Good of
+	true  -> #route{in=N, out=length(Map), map=Map};
+	false -> erlang:error(badarg, [N, Map])
+    end.
 
-id() ->
-    #pipe{ops=[]}.
+sync(N) when is_number(N) -> #sync{size=N}.
+
+split(N) when is_number(N) -> #split{size=N}.
+
+merge(N) when is_number(N) -> #merge{size=N}.
+
+switch(Map = [{_Tag1, Op1}|Rest]) ->
+    In = in_arity(Op1),
+    Out = out_arity(Op1),
+    Match = lists:all(
+	      fun({_T_I, Op}) ->
+		      is_operation(Op) andalso
+			  in_arity(Op) == In andalso out_arity(Op) == Out
+	      end,
+	      Rest),
+    case Match of
+	true  -> #switch{in=1+In, out=Out, map=Map};
+	false -> erlang:error(badarg, [Map])
+    end.
+
+%%%-----------------------------------------------------------------------------
+%%% Operation querying
+%%%-----------------------------------------------------------------------------
+
+is_operation(#func{})   -> true;
+is_operation(#value{})  -> true;
+is_operation(#pipe{})   -> true;
+is_operation(#par{})    -> true;
+is_operation(#route{})  -> true;
+is_operation(#sync{})   -> true;
+is_operation(#split{})  -> true;
+is_operation(#merge{})  -> true;
+is_operation(#switch{}) -> true;
+is_operation(_)         -> false.
+
+in_arity(#func{in=In})   -> In;
+in_arity(#value{})       -> 0;
+in_arity(#pipe{in=In})   -> In;
+in_arity(#par{in=In})    -> In;
+in_arity(#route{in=In})  -> In;
+in_arity(#sync{size=N})  -> N;
+in_arity(#split{})       -> 1;
+in_arity(#merge{size=N}) -> N;
+in_arity(#switch{in=In}) -> In.
+
+out_arity(#func{})          -> 1;
+out_arity(#value{})         -> 1;
+out_arity(#pipe{out=Out})   -> Out;
+out_arity(#par{out=Out})    -> Out;
+out_arity(#route{out=Out})  -> Out;
+out_arity(#sync{size=N})    -> N;
+out_arity(#split{size=N})   -> N;
+out_arity(#merge{})         -> 1;
+out_arity(#switch{out=Out}) -> Out.
+
+flatten_arity([First|Rest]) ->
+    lists:foldl(fun(Op, {In, Out}) ->
+			{In + in_arity(Op), Out + out_arity(Op)}
+	  end,
+	  {in_arity(First), out_arity(First)}, Rest).
+
+can_connect(Op1, Op2) ->
+    case {out_arity(Op1), in_arity(Op2)} of
+	{N,       N} -> true;
+	_            -> false
+    end.

+ 185 - 0
src/ebb_run.erl

@@ -0,0 +1,185 @@
+-module(ebb_run).
+-include("../include/ebb_prim.hrl").
+
+%%% Evaulation
+-export([linearize/1, run_linear/2, run_linear/1]).
+
+%%% Simplification
+-export([simpl/1]).
+
+%%%-----------------------------------------------------------------------------
+%%% Evaluation
+%%%-----------------------------------------------------------------------------
+
+linearize(Op) ->
+    fun(Args) -> run_linear(Op, Args) end.
+
+run_linear(Op) ->
+    run_linear(Op, []).
+
+run_linear(Op, Args) ->
+    case ebb_prim:is_operation(Op) andalso
+	ebb_prim:in_arity(Op) == length(Args) of
+	true  -> do_run_linear(Op, Args);
+	false -> erlang:error(badargs, [Op, Args])
+    end.
+
+do_run_linear(#value{value=X}, []) ->
+    list_output(X);
+do_run_linear(#func{code=F}, Args) ->
+    list_output(apply(F, Args));
+do_run_linear(#pipe{ops=Ops}, Args) ->
+    lists:foldl(fun do_run_linear/2, Args, Ops);
+do_run_linear(#par{ops=Ops}, Args) ->
+    {Result, []} =
+	lists:foldl(
+	  fun(Op, {Results, Remaining}) ->
+		  {Args1, Rest} = lists:split(ebb_prim:in_arity(Op), Remaining),
+		  {[do_run_linear(Op, Args1) | Results], Rest}
+	  end,
+	  {[], Args}, Ops),
+    lists:append(lists:reverse(Result));
+do_run_linear(#route{map=M}, Args) ->
+    [ lists:nth(I, Args) || I <- M ];
+do_run_linear(#sync{}, Args) ->
+    Args;
+do_run_linear(#split{}, Args) ->
+    Args;
+do_run_linear(#merge{}, Args) ->
+    Args;
+do_run_linear(#switch{map=M}, [Tag | Args]) ->
+    do_run_linear(element(2, lists:keyfind(Tag, 1, M)), Args).
+
+list_output(X) ->
+    if is_list(X) -> X;
+       is_tuple(X) -> tuple_to_list(X);
+       true -> [X]
+    end.
+    
+%%%-----------------------------------------------------------------------------
+%%% Simplification
+%%%-----------------------------------------------------------------------------
+
+simpl(Pipe = #pipe{ops=Inside}) ->
+    flatten(Pipe#pipe{ops=simpl_pipe([ simpl(O) || O <- Inside ])});
+simpl(Par = #par{ops=Inside}) ->
+    flatten(Par#par{ops=simpl_par([ simpl(O) || O <- Inside ])});
+simpl(Switch = #switch{map=Map}) ->
+    Switch#switch{map=[ {Tag, simpl(Op)} || {Tag, Op} <- Map ]};
+simpl(Op) ->
+    case ebb_prim:is_operation(Op) of
+	true  -> Op;
+	false -> erlang:error(badarg, [Op])
+    end.
+
+simpl_pipe(Ops) ->
+    lists:reverse(lists:foldl(fun simpl_pipe/2, [], Ops)).
+
+simpl_pipe(#pipe{ops=Ops}, Acc) ->
+    lists:reverse(Ops, Acc);
+simpl_pipe(Par = #par{}, [Any | Acc]) ->
+    simpl_pipe_par(Any, Par) ++ Acc;
+simpl_pipe(R2 = #route{in=N}, Acc) ->
+    case {R2 == ebb_flow:id(N), Acc} of
+	{true, _} -> Acc;
+	{false, [R1 = #route{} | Rest]} ->
+	    R3 = #route{in=N3} = simpl_pipe_route(R1, R2),
+	    case R3 == ebb_flow:id(N3) of
+		true -> Rest;
+		false -> [R3 | Rest]
+	    end;
+	{false, _} -> [R2 | Acc]
+    end;
+simpl_pipe(#split{size=N}, [#merge{size=N} | Acc]) ->
+    Acc;
+simpl_pipe(#merge{size=N}, [#split{size=N} | Acc]) ->
+    Acc;
+simpl_pipe(Op, Acc) ->
+    [Op | Acc].
+
+simpl_pipe_par(Par1 = #par{ops=Ops1}, Par2 = #par{ops=Ops2}) ->
+    case zip_pars(Ops1, Ops2) of
+	[{Ops1, Ops2}] -> [Par2, Par1];
+	Zip -> [permute_pipe_par(Zip)]
+    end;
+simpl_pipe_par(Any, Par = #par{ops=Ops}) ->
+    case zip_pars([Any], Ops) of
+	[{Any, Ops}] -> [Any, Par];
+	Zip -> [permute_pipe_par(Zip)]
+    end.
+
+permute_pipe_par(Zip) ->
+    ebb_prim:par(
+      lists:flatten(
+	[ case {O1, O2} of
+	      {[_|_],  [_|_]} ->
+		  [simpl(ebb_prim:pipe(
+			   [ flatten(ebb_prim:par(X))
+			     || X <- [O1, O2] ]))];
+	      {_, _} -> [O1, O2]
+	  end
+	  || {O1, O2} <- Zip ])).
+
+zip_pars(Par1, Par2) ->
+    zip_pars(1, Par1, [], 1, Par2, [], []).
+
+zip_pars(_, _Par1 = [], Prev1,
+	 _, Par2,       Prev2,
+	 Acc) ->
+    lists:reverse([ {lists:reverse(Prev1), lists:reverse(Prev2, Par2)} | Acc ]);
+zip_pars(_, Par1,       Prev1,
+	 _, _Par2 = [], Prev2,
+	Acc) ->
+    lists:reverse([ {lists:reverse(Prev1, Par1), lists:reverse(Prev2)} | Acc ]);
+zip_pars(M, _Par1 = [Op1|Rest1], Prev1,
+	 N, _Par2 = [Op2|Rest2], Prev2,
+	 Acc) ->
+    case {M+ebb_prim:out_arity(Op1), N+ebb_prim:in_arity(Op2), Prev2} of
+	{_, N, []} ->
+	    zip_pars(M, [Op1|Rest1], Prev1,
+		     N, Rest2, [],
+		     [ {[], Op2} | Acc ]);
+	{I, I, _} -> 
+	    zip_pars(I, Rest1, [],
+		     I, Rest2, [],
+		     [ {lists:reverse([Op1|Prev1]), lists:reverse([Op2|Prev2])}
+		       | Acc ]);
+	{M2, N2, _} when M2 < N2 ->
+	    zip_pars(M2, Rest1, [Op1|Prev1],
+		     N, [Op2|Rest2], Prev2,
+		     Acc);
+	{M2, N2, _} when N2 < M2 ->
+	    zip_pars(M, [Op1|Rest1], Prev1,
+		     N2, Rest2, [Op2|Prev2],
+		     Acc)
+    end.
+
+simpl_pipe_route(#route{in=In, map=M1}, #route{map=M2}) ->
+    ebb_prim:route(In, [ lists:nth(I, M1) || I <- M2 ]).
+
+simpl_par(Ops) ->
+    lists:reverse(lists:foldl(fun simpl_par/2, [], Ops)).
+
+simpl_par(#par{ops=Ops}, Acc) ->
+    lists:reverse(Ops, Acc);
+simpl_par(#route{in=0, out=0, map=[]}, Acc) ->
+    Acc;
+simpl_par(R2 = #route{}, [R1 = #route{} | Acc]) ->
+    [simpl_par_route(R1, R2) | Acc];
+simpl_par(Op, Acc) ->
+    [Op | Acc].
+
+simpl_par_route(#route{in=In1, map=M1}, #route{in=In2, map=M2}) ->
+    ebb_prim:route(In1+In2,
+		   lists:append(M1, [ S2+In1 || S2 <- M2 ])).
+
+flatten(#pipe{in=N, out=N, ops=[]}) ->
+    ebb_flow:id(N);
+flatten(#pipe{ops=[Single]}) ->
+    Single;
+flatten(#par{in=N, out=N, ops=[]}) ->
+    ebb_flow:id(N);
+flatten(#par{ops=[Single]}) ->
+    Single;
+flatten(Op) ->
+    Op.