Parcourir la source

Added ebb_flow:map, ebb_flow:reduce, and ebb_flow:map_reduce.

Paul Downen il y a 13 ans
Parent
commit
60a562e9cf
1 fichiers modifiés avec 60 ajouts et 1 suppressions
  1. 60 1
      src/ebb_flow.erl

+ 60 - 1
src/ebb_flow.erl

@@ -16,7 +16,10 @@
 -export([id/0, id/1, nop/0, select/2]).
 
 %%% Parallelism
--export([fanout/1, fanout/2]).
+-export([fanout/1, fanout/2,
+	 map/2, map/3,
+	 reduce/2, reduce/3,
+	 map_reduce/3, map_reduce/4]).
 
 %%% Sequencing
 -export([seq/1, seq/2]).
@@ -138,6 +141,62 @@ fanout(In, Copies) ->
     Out = Copies*In,
     route(In, [ (I rem In) + 1 || I <- lists:seq(0, Out-1) ]).
 
+map(Func, List) ->
+    FOp = to_op(Func),
+    LOp = to_op(List),
+    FIn = ebb_prim:in_arity(FOp),
+    LOut = ebb_prim:out_arity(LOp),
+    case {LOut div FIn, LOut rem FIn} of
+	{N, 0} -> pipe(LOp, par(lists:duplicate(N, FOp)));
+	{_, _} -> erlang:error(badarg, [Func, List])
+    end.
+
+map(N, Func, List) ->
+    map(Func, pipe(to_op(List), split(N))).
+
+reduce(Func, List) ->
+    try
+	reduce_left(to_op(Func), to_op(List))
+    catch
+	throw:badarg -> erlang:error(badarg, [Func, List])
+    end.
+
+reduce(N, Func, List) ->
+    reduce(Func, pipe(to_op(List), split(N))).
+
+reduce_left(FOp, LOp) ->
+    FIn = ebb_prim:in_arity(FOp),
+    LOut = ebb_prim:out_arity(LOp),
+    case {LOut, LOut div FIn, LOut rem FIn} of
+	{0, 0, _} -> LOp;
+	{0, _, _} -> throw(badarg);
+	{_, 1, 0} -> pipe(LOp, FOp);
+	{_, N, 0} -> reduce_right(
+		       FOp, pipe(LOp, par(lists:duplicate(N, FOp))));
+	{_, N, M} -> reduce_right(
+		       FOp, pipe(LOp, par(lists:append(lists:duplicate(N, FOp),
+						       [id(M)]))))
+    end.
+
+reduce_right(FOp, LOp) ->
+    FIn = ebb_prim:in_arity(FOp),
+    LOut = ebb_prim:out_arity(LOp),
+    case {LOut, LOut div FIn, LOut rem FIn} of
+	{0, 0, _} -> LOp;
+	{0, _, _} -> throw(badarg);
+	{_, 1, 0} -> pipe(LOp, FOp);
+	{_, N, 0} -> reduce_left(
+		       FOp, pipe(LOp, par(lists:duplicate(N))));
+	{_, N, M} -> reduce_left(
+		       FOp, pipe(LOp, par([id(M) | lists:duplicate(N)])))
+    end.
+
+map_reduce(Map, Red, List) ->
+    reduce(Red, map(Map, List)).
+
+map_reduce(N, Map, Red, List) ->
+    reduce(Red, map(N, Map, List)).
+
 %%%-----------------------------------------------------------------------------
 %%% Sequencing
 %%%-----------------------------------------------------------------------------