Sfoglia il codice sorgente

Simplified and reduced the number of control primitives.

Operations are now one of:
  func, value, pipe, par, route, sync, split, merge, switch
Paul Downen 14 anni fa
parent
commit
f9a5f64fb0
2 ha cambiato i file con 104 aggiunte e 141 eliminazioni
  1. 51 54
      include/ebb_prim.hrl
  2. 53 87
      src/ebb_prim.erl

+ 51 - 54
include/ebb_prim.hrl

@@ -1,66 +1,63 @@
-% operation A B = A |->| B
-% task B = operation () B = () |->| B
-
-% operation ::= func  | value    | dynamic  | route
-%             | pipe  | parallel | sequence | funnel
-%             | split | merge    | switch   | loop
-
-% func (A_1 * ... * A_n) B ::=
-%   { in = n : number, code : fun(A_1, ..., A_n) -> B }
+%% operation(A, B) = A |->| B
+%% task(B) = operation((), B) = () |->| B = |<B>|
+
+%% operation(A, B) ::= func_F(A, B) 
+%%                   | value_X(A, B)
+%%                   | pipe_Op1,...,Op_I(A, B)
+%%                   | par_Op1,...,Op_I(A, B)
+%%                   | route_N,M,R(A, B)
+%%                   | sync_N(A, B)
+%%                   | split_N(A, B)
+%%                   | merge_N(A, B)
+%%                   | switch_I,N(A, B)
+
+%% func_F(A_1 * ... * A_n, B) ::= { in = arity(F) : number, code = F }
+%%   where F : fun(A_1, ..., A_n) -> B
 -record(func, {in, code}).
 
-% value () B ::= { value : B }
+%% value_X((), B) ::= { value = X }
+%%   where X : B
 -record(value, {value}).
 
-% dynamic (A_1 * ... * A_n) (operation () B) ::=
-%   { in = n : number, code : fun(A_1, ..., A_n) -> operation () B }
--record(dynamic, {in, code}).
-
-% route_R A (route_map R A) ::=
-%   { in = size(A) : number, out = length(R) : number,
-%     map = R : [ (1..size(A)) ] }
--record(route, {in, out, map}).
-
-% pipe (A_1 * ... * A_n) (B_1 * ... * B_m) ::=
-%   { in = n : number, out = m : number,
-%     ops : [ operation (A_1_1 * ... * A_1_n1) (A_2_1 * ... * A_2_n2),
-%             operation (A_2_1 * ... * A_2_n2) (A_3_1 * ... * A_3_n3),
-%             ...,
-%             operation (A_i_1 * ... * A_i_ni) (B_1 * ... * B_m) ] }
+%% pipe_Op_1,Op_2,...,Op_I(A_1_1 * ... * A_1_N1, B_1 * ... * B_M) ::=
+%%   { in = N1 : number, out = M : number,
+%%     ops : [ Op_1, Op_2, ..., Op_i ] }
+%%   where Op_1 : A_1_1 * ... * A_1_N1 |->| A_2_1 * ... * A_2_N2
+%%         Op_2 : A_2_1 * ... * A_2_N2 |->| A_3_1 * ... * A_3_N3
+%%         ...
+%%         Op_I : A_I_1 * ... * A_I_NI |->| B_1 * ... * B_M
 -record(pipe, {in, out, ops}).
 
-% parallel (flatten([A_1, ..., A_n])) (flatten([B_1, ..., B_n])) ::=
-%   { in = size(flatten([A_1, ..., A_n])) : number,
-%     out = size(flatten([B_1, ..., B_n])) : number,
-%     ops : [ operation A_1 B_1, ..., operation A_n B_n ] }
--record(parallel, {in, out, ops}).
+%% par_Op_1,Op2_,...,Op_I(A, B) ::=
+%%   { in = size(A) : number,
+%%     out = size(B) : number,
+%%     ops : [ Op_1, Op_2, ..., Op_I ] }
+%%   where Op_1 : A_1 |->| B_1
+%%         Op_2 : A_2 |->| B_2
+%%         ...
+%%         Op_I : A_I |->| B_I
+%%         A = flatten([A_1, A_2, ..., A_I])
+%%         B = flatten([B_1, B_2, ..., B_I])
+-record(par, {in, out, ops}).
+
+%% route_N,M,R(A, [ sources(R, Target) || Target <- [1..M] ]) ::=
+%%   { in = N, out = M, map = R }
+%%   where N : number, M : number
+%%         R : [{source, target}]
+%%         source : [1..N], target : [1..M]
+-record(route, {in, out, map}).
 
-% sequence (flatten([A_1, ..., A_n])) (flatten([B_1, ..., B_n])) ::=
-%   { in = size(flatten([A_1, ..., A_n])) : number,
-%     out = size(flatten([B_1, ..., B_n])) : number,
-%     ops : [ operation A_1 B_1, ..., operation A_n B_n ] }
--record(sequence, {in, out, ops}).
+%% sync_N(A_1 * ... * A_N, A_1 * ... A_N) ::= { size = N : number }
+-record(sync, {size}).
 
-% split (unit (A_1 * ... * A_n)) (A_1 * ... * A_n) ::= { size = n : number }
+%% split_N(unit(A_1 * ... * A_N), A_1 * ... * A_N) ::= { size = N : number }
 -record(split, {size}).
 
-% merge (A_1 * ... * A_n) (unit (A_1 * ... * A_n)) ::= { size = n : number }
+%% merge_N(A_1 * ... * A_N, unit(A_1 * ... * A_N)) ::= { size = N : number }
 -record(merge, {size}).
 
-% funnel (A * ... * A) A ::= { size = size(A * ... * A) : number }
--record(funnel, {size}).
-
-% switch (A_1 * ... * A_n) (B_1 + ... + B_m) ::=
-%   { in = n : number, out = m : number,
-%     switch : operation (A_1, ..., A_n) (Tag_1 + ... + Tag_m),
-%     map    : [ {Tag_1, operation (A_1 * ... * A_n) B_1},
-%                ...,
-%                {Tag_m, operation (A_1 * ... * A_n) B_m} ] }
--record(switch, {in, out, switch, map}).
-
-% loop (A_1 * ... * A_n * C_1 * ... * C_i) (B_1 * ... * B_m) ::=
-%   { in = n : number, out = m : number,
-%     init : operation (C_1 * ... * C_i) (S_1 * ... * S_j),
-%     op   : operation (A_1 * ... * A_n * S_1 * ... * S_j)
-%                      (B_1 * ... * B_m * S_1 * ... * S_j) }
--record(loop, {in, out, init, op}).
+%% switch_I,N((T_1 + ... + T_I) * A_1 * ... * A_N, 
+%%            duplicate(I, A_1 * ... * A_N)) ::=
+%%   { in = 1+N : number, out = I*N : number,
+%%     map    : [ {T_J, J} || J <- [1..I] ] }
+-record(switch, {in, out, map}).

+ 53 - 87
src/ebb_prim.erl

@@ -2,9 +2,10 @@
 -include("../include/ebb_prim.hrl").
 
 %%% Operation construction
--export([func/1, value/1, values/1, dynamic/1, route/2,
-	 pipe/1, parallel/1, sequence/1, funnel/1,
-	 split/1, merge/1, switch/2, loop/2]).
+-export([func/1, value/1, values/1,
+	 pipe/1, par/1,
+	 route/3, sync/1,
+	 split/1, merge/1, switch/2]).
 
 %%% Operation querying
 -export([in_arity/1, out_arity/1, can_connect/2]).
@@ -21,31 +22,14 @@ func(F) when is_function(F) ->
 value(X) -> #value{value=X}.
 
 values(Xs = [_|_]) ->
-    parallel(lists:map(fun value/1, Xs)).
-
-dynamic(Gen) when is_function(Gen) ->
-    {arity, N} = erlang:fun_info(Gen, arity),
-    #dynamic{in=N, code=Gen}.
-
-route(N, Map) ->
-    try
-	lists:foldl(fun(Target, Count) when Target =< N -> Count+1;
-		       (Target, _Count) when Target > N -> throw(out_of_bounds)
-		    end,
-		    0, Map)
-    of 
-	Count -> #route{in=N, out=Count, map=Map}
-    catch
-	throw:out_of_bounds -> erlang:error(badarg, [N, Map])
-    end.
+    par([ value(X) || X <- Xs ]).
 
 pipe(Ops = [First|Rest]) ->
     try 
-	lists:foldl(fun(Op2, Op1) ->
-			       case can_connect(Op1, Op2) of
-				   true  -> Op2;
-				   false -> throw(bad_connection)
-			       end
+	lists:foldl(fun(Op2, Op1) -> case can_connect(Op1, Op2) of
+					 true  -> Op2;
+					 false -> throw(bad_connection)
+				     end
 		    end,
 		    First, Rest)
     of
@@ -54,92 +38,74 @@ pipe(Ops = [First|Rest]) ->
 	throw:bad_connection -> erlang:error(badarg, [Ops])
     end.
 
-parallel(Ops = [_|_]) ->
+par(Ops = [_|_]) ->
     {In, Out} = flatten_arity(Ops),
-    #parallel{in=In, out=Out, ops=Ops}.
+    #par{in=In, out=Out, ops=Ops}.
+
+route(N, M, Map) ->
+    Good = lists:all(fun({Source, Target}) when Source >= 1, Source =< N,
+						Target >= 1, Target =< M ->
+			     true;
+			({_Source, _Target}) ->
+			     false
+		     end,
+		     Map),
+    case Good of
+	true  -> #route{in=N, out=M, map=Map};
+	false -> erlang:error(badarg, [N, M, Map])
+    end.
 
-sequence(Ops = [_|_]) ->
-    {In, Out} = flatten_arity(Ops),
-    #sequence{in=In, out=Out, ops=Ops}.
+sync(N) -> #sync{size=N}.
 
 split(N) -> #split{size=N}.
 
 merge(N) -> #merge{size=N}.
 
-funnel(N) -> #funnel{size=N}.
-
-switch(Switch, Map = [{_, Op1}|Rest]) ->
+switch(N, Map = [_|_]) ->
+    SMap = lists:ukeysort(2, Map),
     try
-	{I, O} = lists:foldl(
-		   fun({_, Op}, {In, Out}) -> known_op_arity(Op, In, Out) end,
-		   {in_arity(Op1), out_arity(Op1)}, Rest),
-	known_op_arity(Switch, I, 1),
-	{I, O}
+	lists:foldl(fun({_T_I, I}, Prev_I) when I == 1+Prev_I -> I;
+		       ({_T_I, _I}, _Prev_I) -> throw(missing_path)
+		    end,
+		    0, SMap)
     of
-	{In, Out} -> #switch{in=In, out=Out, switch=Switch, map=Map}
+	I -> #switch{in=1+N, out=I*N, map=SMap}
     catch
-	error:function_clause -> erlang:error(badarg, [Switch, Map])
+	throw:missing_path -> erlang:error(badarg, [N, Map])
     end.
 
-loop(Init, Op) ->
-    S = out_arity(Init),
-    {In, Out} = case {in_arity(Op)-S, out_arity(Op)-S} of
-		    {I, O} when I >= 0, O > 0 -> {I+in_arity(Init), O};
-		    {_, _} -> erlang:error(badarg, [Init, Op])
-		end,
-    #loop{in=In, out=Out, init=Init, op=Op}.
-
 %%%-----------------------------------------------------------------------------
 %%% Operation querying
 %%%-----------------------------------------------------------------------------
 
-in_arity(#func{in=In})     -> In;
-in_arity(#value{})         -> 0;
-in_arity(#dynamic{in=In})  -> In;
-in_arity(#route{in=In})    -> In;
-in_arity(#pipe{in=In})     -> In;
-in_arity(#parallel{in=In}) -> In;
-in_arity(#sequence{in=In}) -> In;
-in_arity(#split{})         -> 1;
-in_arity(#merge{size=N})   -> N;
-in_arity(#funnel{size=N})  -> N;
-in_arity(#switch{in=In})   -> In;
-in_arity(#loop{in=In})     -> In.
-
-out_arity(#func{})            -> 1;
-out_arity(#value{})           -> 1;
-out_arity(#dynamic{})         -> unknown;
-out_arity(#route{out=Out})    -> Out;
-out_arity(#pipe{out=Out})     -> Out;
-out_arity(#parallel{out=Out}) -> Out;
-out_arity(#sequence{out=Out}) -> Out;
-out_arity(#split{size=N})     -> N;
-out_arity(#merge{})           -> 1;
-out_arity(#funnel{})          -> 1;
-out_arity(#switch{out=Out})   -> Out;
-out_arity(#loop{out=Out})     -> Out.
+in_arity(#func{in=In})   -> In;
+in_arity(#value{})       -> 0;
+in_arity(#pipe{in=In})   -> In;
+in_arity(#par{in=In})    -> In;
+in_arity(#route{in=In})  -> In;
+in_arity(#sync{size=N})  -> N;
+in_arity(#split{})       -> 1;
+in_arity(#merge{size=N}) -> N;
+in_arity(#switch{in=In}) -> In.
+
+out_arity(#func{})          -> 1;
+out_arity(#value{})         -> 1;
+out_arity(#pipe{out=Out})   -> Out;
+out_arity(#par{out=Out})    -> Out;
+out_arity(#route{out=Out})  -> Out;
+out_arity(#sync{size=N})    -> N;
+out_arity(#split{size=N})   -> N;
+out_arity(#merge{})         -> 1;
+out_arity(#switch{out=Out}) -> Out.
 
 flatten_arity([First|Rest]) ->
     lists:foldl(fun(Op, {In, Out}) ->
-			{add_arity(In, in_arity(Op)),
-			 add_arity(Out, out_arity(Op))}
+			{In + in_arity(Op), Out + out_arity(Op)}
 	  end,
 	  {in_arity(First), out_arity(First)}, Rest).
 
-add_arity(unknown, _) -> unknown;
-add_arity(_, unknown) -> unknown;
-add_arity(N, M)       -> N + M.
-
-known_arity(N, N)       -> N;
-known_arity(unknown, N) -> N;
-known_arity(N, unknown) -> N.
-
-known_op_arity(Op, In, Out) ->
-    {known_arity(in_arity(Op), In), known_arity(out_arity(Op), Out)}.
-
 can_connect(Op1, Op2) ->
     case {out_arity(Op1), in_arity(Op2)} of
 	{N,       N} -> true;
-	{unknown, _} -> true;
 	_            -> false
     end.