Przeglądaj źródła

Added smart constructors to ebb_flow.

Paul Downen 14 lat temu
rodzic
commit
45e0f998e0
1 zmienionych plików z 129 dodań i 35 usunięć
  1. 129 35
      src/ebb_flow.erl

+ 129 - 35
src/ebb_flow.erl

@@ -1,5 +1,17 @@
 -module(ebb_flow).
 
+%%% Smart constructors
+-export([func/1, func/2,
+	 value/1, values/1,
+	 pipe/1, pipe/2,
+	 par/1, par/2,
+	 route/2,
+	 sync/1, sync/2,
+	 split/1, split/2,
+	 merge/1, merge/2,
+	 switch/1]).
+-export([to_op/1, par_pipes/1, pipe_pars/1]).
+
 %%% Convenience operations
 -export([id/0, id/1, nop/0, select/2]).
 
@@ -7,28 +19,113 @@
 -export([fanout/1, fanout/2]).
 
 %%% Sequencing
--export([seq/1]).
+-export([seq/1, seq/2]).
 
 %%% Choice
 -export([cases/1, case_of/2,
 	 iff/2, if_else/3,
 	 fanin/1, fanin/2]).
 
+%%%-----------------------------------------------------------------------------
+%%% Smart constructors
+%%%-----------------------------------------------------------------------------
+
+value(X) ->
+    ebb_prim:value(X).
+
+values(Xs) ->
+    ebb_prim:par([ ebb_prim:value(X) || X <- Xs ]).
+
+func(F) ->
+    ebb_prim:func(F).
+
+func(F, N) ->
+    ebb_prim:pipe([ebb_prim:func(F), ebb_prim:split(N)]).
+
+pipe(Xs = [_|_]) ->
+    ebb_prim:pipe([ to_op(X) || X <- Xs ]);
+pipe(X) ->
+    to_op(X).
+
+pipe(X, Y) ->
+    pipe([X, Y]).
+
+par(Xs = [_|_]) ->
+    ebb_prim:par([ to_op(X) || X <- Xs ]);
+par(X) ->
+    to_op(X).
+
+par(X, Y) ->
+    par([X, Y]).
+
+route(N, Map) ->
+    ebb_prim:route(N, Map).
+
+sync(N) ->
+    ebb_prim:sync(N).
+
+sync(X1, X2) ->
+    Op1 = to_op(X1),
+    Op2 = to_op(X2),
+    ebb_prim:pipe([Op1, sync(ebb_prim:out_arity(Op1)), Op2]).
+
+split(N) ->
+    ebb_prim:split(N).
+
+split(X1, X2) ->
+    Op1 = to_op(X1),
+    Op2 = to_op(X2),
+    ebb_prim:pipe([Op1, split(ebb_prim:in_arity(Op2)), Op2]).
+
+merge(N) ->
+    ebb_prim:merge(N).
+
+merge(X1, X2) ->
+    Op1 = to_op(X1),
+    Op2 = to_op(X2),
+    ebb_prim:pipe([Op1, merge(ebb_prim:out_arity(Op1)), Op2]).
+
+switch(Branches) ->
+    ebb_prim:switch([ {Tag, to_op(X)} || {Tag, X} <- Branches ]).
+
+to_op(X) ->
+    case {ebb_prim:is_operation(X), is_function(X)} of
+	{true,  false} -> X;
+	{false, true}  -> func(X);
+	{false, false} -> value(X)
+    end.
+
+par_pipes(Args) ->
+    ebb_prim:par([ nested_operation(X, fun pipe_pars/1) || X <- Args ]).
+
+pipe_pars(Args) ->
+    ebb_prim:pipe([ nested_operation(X, fun par_pipes/1) || X <- Args ]).
+
+nested_operation(Ops, Continue) when is_list(Ops) ->
+    Continue(Ops);
+nested_operation(F, _Continue) when is_function(F) ->
+    ebb_prim:func(F);
+nested_operation(X, _Continue) ->
+    case ebb_prim:is_operation(X) of
+	true  -> X;
+        false -> ebb_prim:value(X)
+    end.
+
 %%%-----------------------------------------------------------------------------
 %%% Convenience operations
 %%%-----------------------------------------------------------------------------
 
 id() ->
-    ebb_prim:route(1, [1]).
+    route(1, [1]).
 
 id(N) ->
-    ebb_prim:route(N, lists:seq(1,N)).
+    route(N, lists:seq(1,N)).
 
 nop() ->
-    ebb_prim:value('nop').
+    value('nop').
 
 select(I, N) ->
-    ebb_prim:route(N, [I]).
+    route(N, [I]).
 
 %%%-----------------------------------------------------------------------------
 %%% Parallelism
@@ -39,63 +136,60 @@ fanout(Copies) ->
 
 fanout(In, Copies) ->
     Out = Copies*In,
-    ebb_prim:route(In, [ (I rem In) + 1 || I <- lists:seq(0, Out-1) ]).
+    route(In, [ (I rem In) + 1 || I <- lists:seq(0, Out-1) ]).
 
 %%%-----------------------------------------------------------------------------
 %%% Sequencing
 %%%-----------------------------------------------------------------------------
 
-seq(Ops = [Op1|Rest]) ->
+seq(Xs = [_|_]) ->
+    Ops = [Op1|Rest] = [ to_op(X) || X <- Xs ],
     {In, _} = ebb_prim:flatten_arity(Ops),
+    Produced1 = ebb_prim:out_arity(Op1),
+    Remaining1 = In-ebb_prim:in_arity(Op1),
     {Seq, _, _} =
 	lists:foldl(fun(Op, {Seq,Produced,Remaining}) ->
-			    Sync = ebb_prim:sync(Produced+Remaining),
-			    {ebb_prim:pipe(
-			       [Seq, Sync,
-				build_seq_step(Op, Produced, Remaining)]),
-			     Produced+ebb_prim:out_arity(Op),
-			     Remaining-ebb_prim:in_arity(Op)}
+			    Produced2 = Produced+ebb_prim:out_arity(Op),
+			    Remaining2 = Remaining-ebb_prim:in_arity(Op),
+			    {sync(Seq, par([id(Produced),
+					    Op,
+					    id(Remaining2)])),
+			     Produced2, Remaining2}
 		    end,
-		    {build_seq_step(Op1, 0, In),
-		     ebb_prim:out_arity(Op1),
-		     In-ebb_prim:in_arity(Op1)},
+		    {par(Op1, id(Remaining1)),
+		     Produced1, Remaining1},
 		    Rest),
-    Seq.
-
-build_seq_step(Op, Produced, Remaining) ->
-    case {Produced, Remaining-ebb_prim:in_arity(Op)} of
-	{0, 0}          -> Op;
-	{_, 0}          -> ebb_prim:par([id(Produced), Op]);
-	{0, Remaining2} -> ebb_prim:par([Op, id(Remaining2)]);
-	{_, Remaining2} -> ebb_prim:par([id(Produced), Op, id(Remaining2)])
-    end.
+    Seq;
+seq(X) ->
+    to_op(X).
+
+seq(X, Y) ->
+    seq([X, Y]).
 
 %%%-----------------------------------------------------------------------------
 %%% Choice
 %%%-----------------------------------------------------------------------------
 
 cases(Branches) ->
-    ebb_prim:switch(Branches).
+    switch(Branches).
 
 case_of(Disc, Branches) ->
     Cases = cases(Branches),
-    case ebb_prim:in_arity(Cases)-1 of
-	0 -> ebb_prim:pipe([Disc, Cases]);
-	In -> ebb_prim:pipe(
-		[ebb_prim:par([Disc, id(In)]),
-		 Cases])
-    end.
+    pipe_pars([[to_op(Disc), id(ebb_prim:in_arity(Cases)-1)],
+	       Cases]).
 
 iff(Then, Else) ->
-    cases([{true, Then}, {false, Else}]).
+    cases([{true, Then},
+	   {false, Else}]).
 
 if_else(Test, Then, Else) ->
-    case_of(Test, [{true, Then}, {false, Else}]).
+    case_of(Test, [{true, Then},
+		   {false, Else}]).
 
 fanin(Copies) ->
     cases([ {I , select(I, Copies)} || I <- lists:seq(1, Copies) ]).
 
 fanin(Copies, Out) ->
     In = Copies*Out,
-    cases([ {I, ebb_prim:route(In, [ (I-1)*Out + J || J <- lists:seq(1, Out)])}
+    cases([ {I, ebb_flow:route(In, [ (I-1)*Out + J || J <- lists:seq(1, Out)])}
 	    || I <- lists:seq(1, Copies) ]).