Browse Source

Added support for funneling operations.

Paul Downen 13 years ago
parent
commit
0d05a5e6f8
3 changed files with 22 additions and 3 deletions
  1. 4 1
      include/flow_graph.hrl
  2. 13 1
      src/ebb_par.erl
  3. 5 1
      src/flow_graph.erl

+ 4 - 1
include/flow_graph.hrl

@@ -2,7 +2,7 @@
 % task B = operation () B = () |->| B
 
 % operation ::= func  | value    | dynamic  | route
-%             | pipe  | parallel | sequence
+%             | pipe  | parallel | sequence | funnel
 %             | split | merge    | switch   | loop
 
 % func (A_1 * ... * A_n) B ::=
@@ -47,6 +47,9 @@
 % merge (A_1 * ... * A_n) (unit (A_1 * ... * A_n)) ::= { size = n : number }
 -record(merge, {size}).
 
+% funnel (A * ... * A) A ::= { size = size(A * ... * A) : number }
+-record(funnel, {size}).
+
 % switch (A_1 * ... * A_n) (B_1 + ... + B_m) ::=
 %   { in = n : number, out = m : number,
 %     switch : operation (A_1, ..., A_n) (Tag_1 + ... + Tag_m),

+ 13 - 1
src/ebb_par.erl

@@ -1,6 +1,8 @@
 -module(ebb_par).
 -export([scatter/2, scatter/1,
-	 parallel/2, parallel/1]).
+	 parallel/2, parallel/1,
+	 fanout/2, fanout/1,
+	 funnel/2, funnel/1]).
 
 scatter(Task1, Task2) ->
     flow_graph:parallel([Task1, Task2]).
@@ -27,3 +29,13 @@ fanout([Op]) ->
 fanout(Ops = [_|_]) ->
     flow_graph:pipe([flow_graph:route(1, lists:duplicate(length(Ops), 1)),
 		     flow_graph:parallel(Ops)]).
+
+funnel(Op1, Op2) ->
+    flow_graph:pipe([flow_graph:parallel([Op1, Op2]),
+		     flow_graph:funnel(2)]).
+
+funnel([Op]) ->
+    Op;
+funnel(Ops = [_|_]) ->
+    flow_graph:pipe([flow_graph:parallel(Ops),
+		     flow_graph:funnel(length(Ops))]).

+ 5 - 1
src/flow_graph.erl

@@ -3,7 +3,7 @@
 
 %%% Operation construction
 -export([func/1, value/1, values/1, dynamic/1, route/2,
-	 pipe/1, parallel/1, sequence/1,
+	 pipe/1, parallel/1, sequence/1, funnel/1,
 	 split/1, merge/1, switch/2, loop/2]).
 
 %%% Operation querying
@@ -66,6 +66,8 @@ split(N) -> #split{size=N}.
 
 merge(N) -> #merge{size=N}.
 
+funnel(N) -> #funnel{size=N}.
+
 switch(Switch, Map = [{_, Op1}|Rest]) ->
     try
 	{I, O} = lists:foldl(
@@ -100,6 +102,7 @@ in_arity(#parallel{in=In}) -> In;
 in_arity(#sequence{in=In}) -> In;
 in_arity(#split{})         -> 1;
 in_arity(#merge{size=N})   -> N;
+in_arity(#funnel{size=N})  -> N;
 in_arity(#switch{in=In})   -> In;
 in_arity(#loop{in=In})     -> In.
 
@@ -112,6 +115,7 @@ out_arity(#parallel{out=Out}) -> Out;
 out_arity(#sequence{out=Out}) -> Out;
 out_arity(#split{size=N})     -> N;
 out_arity(#merge{})           -> 1;
+out_arity(#funnel{})          -> 1;
 out_arity(#switch{out=Out})   -> Out;
 out_arity(#loop{out=Out})     -> Out.