|
@@ -17,9 +17,9 @@
|
|
|
|
|
|
%%% Parallelism
|
|
|
-export([fanout/1, fanout/2,
|
|
|
- map/2, map/3,
|
|
|
- reduce/2, reduce/3,
|
|
|
- map_reduce/3, map_reduce/4]).
|
|
|
+ map/2, map_split/3,
|
|
|
+ reduce/2, reduce/3, reduce_split/3,
|
|
|
+ map_reduce/3, map_reduce/4, map_reduce_split/4]).
|
|
|
|
|
|
%%% Sequencing
|
|
|
-export([seq/1, seq/2]).
|
|
@@ -158,20 +158,32 @@ map(Func, List) ->
|
|
|
{_, _} -> erlang:error(badarg, [Func, List])
|
|
|
end.
|
|
|
|
|
|
-map(N, Func, List) ->
|
|
|
+map_split(N, Func, List) ->
|
|
|
map(Func, pipe(to_op(List), split(N))).
|
|
|
|
|
|
reduce(Func, List) ->
|
|
|
try
|
|
|
- reduce_left(to_op(Func), to_op(List))
|
|
|
+ reduce_right(to_op(Func), to_op(List))
|
|
|
catch
|
|
|
throw:badarg -> erlang:error(badarg, [Func, List])
|
|
|
end.
|
|
|
|
|
|
-reduce(N, Func, List) ->
|
|
|
+reduce(Func, Z, List) ->
|
|
|
+ try
|
|
|
+ ZOp = to_op(Z),
|
|
|
+ case ebb_prim:in_arity(ZOp) of
|
|
|
+ 0 -> ok;
|
|
|
+ _ -> throw(badarg)
|
|
|
+ end,
|
|
|
+ reduce_right(to_op(Func), ZOp, to_op(List))
|
|
|
+ catch
|
|
|
+ throw:badarg -> erlang:error(badarg, [Func, List])
|
|
|
+ end.
|
|
|
+
|
|
|
+reduce_split(N, Func, List) ->
|
|
|
reduce(Func, pipe(to_op(List), split(N))).
|
|
|
|
|
|
-reduce_left(FOp, LOp) ->
|
|
|
+reduce_right(FOp, LOp) ->
|
|
|
FIn = ebb_prim:in_arity(FOp),
|
|
|
LOut = ebb_prim:out_arity(LOp),
|
|
|
case {LOut, LOut div FIn, LOut rem FIn} of
|
|
@@ -181,28 +193,38 @@ reduce_left(FOp, LOp) ->
|
|
|
{_, N, 0} -> reduce_right(
|
|
|
FOp, pipe(LOp, par(lists:duplicate(N, FOp))));
|
|
|
{_, N, M} -> reduce_right(
|
|
|
- FOp, pipe(LOp, par(lists:append(lists:duplicate(N, FOp),
|
|
|
- [id(M)]))))
|
|
|
+ FOp, pipe(LOp, par([id(M) | lists:duplicate(N, FOp)])))
|
|
|
end.
|
|
|
|
|
|
-reduce_right(FOp, LOp) ->
|
|
|
+reduce_right(FOp, ZOp, LOp) ->
|
|
|
FIn = ebb_prim:in_arity(FOp),
|
|
|
LOut = ebb_prim:out_arity(LOp),
|
|
|
- case {LOut, LOut div FIn, LOut rem FIn} of
|
|
|
- {0, 0, _} -> LOp;
|
|
|
- {_, 0, _} -> throw(badarg);
|
|
|
- {_, 1, 0} -> pipe(LOp, FOp);
|
|
|
- {_, N, 0} -> reduce_left(
|
|
|
- FOp, pipe(LOp, par(lists:duplicate(N, FOp))));
|
|
|
- {_, N, M} -> reduce_left(
|
|
|
- FOp, pipe(LOp, par([id(M) | lists:duplicate(N, FOp)])))
|
|
|
+ ZOut = ebb_prim:out_arity(ZOp),
|
|
|
+ N = LOut div FIn,
|
|
|
+ Rem = LOut rem FIn,
|
|
|
+ Need = FIn - Rem,
|
|
|
+ case {LOut, N, Rem, Need div ZOut, Need rem ZOut} of
|
|
|
+ {0, 0, _, _, _} -> LOp;
|
|
|
+ {_, 0, _, Z, 0} -> pipe(par([LOp | lists:duplicate(Z, ZOp)]), FOp);
|
|
|
+ {_, 1, 0, _, _} -> pipe(LOp, FOp);
|
|
|
+ {_, N, 0, _, _} -> reduce_right(
|
|
|
+ FOp, ZOp,
|
|
|
+ pipe(LOp, par(lists:duplicate(N, FOp))));
|
|
|
+ {_, N, _, Z, 0} -> reduce_right(
|
|
|
+ FOp, ZOp,
|
|
|
+ pipe(par([LOp | lists:duplicate(Z, ZOp)]),
|
|
|
+ par(lists:duplicate(N+1, FOp))));
|
|
|
+ {_, _, _, _, _} -> throw(badarg)
|
|
|
end.
|
|
|
|
|
|
map_reduce(Map, Red, List) ->
|
|
|
reduce(Red, map(Map, List)).
|
|
|
|
|
|
-map_reduce(N, Map, Red, List) ->
|
|
|
- reduce(Red, map(N, Map, List)).
|
|
|
+map_reduce(Map, Red, Z, List) ->
|
|
|
+ reduce(Red, Z, map(Map, List)).
|
|
|
+
|
|
|
+map_reduce_split(N, Map, Red, List) ->
|
|
|
+ reduce(Red, map_split(N, Map, List)).
|
|
|
|
|
|
%%%-----------------------------------------------------------------------------
|
|
|
%%% Sequencing
|