Parcourir la source

Generalized pipe/par permutation in ebb_run:simpl/1.

Made ebb_run:simpl_pipe and ebb_run:simpl_pipe_par better handle parallel
operations with generators (operations with no arguments) in the recieving
parallel operation.
Paul Downen il y a 14 ans
Parent
commit
3e2c1ce8e9
1 fichiers modifiés avec 29 ajouts et 19 suppressions
  1. 29 19
      src/ebb_run.erl

+ 29 - 19
src/ebb_run.erl

@@ -77,8 +77,8 @@ simpl_pipe(Ops) ->
 
 simpl_pipe(#pipe{ops=Ops}, Acc) ->
     lists:reverse(Ops, Acc);
-simpl_pipe(Par2 = #par{}, [Par1 = #par{} | Acc]) ->
-    simpl_pipe_par(Par1, Par2) ++ Acc;
+simpl_pipe(Par = #par{}, [Any | Acc]) ->
+    simpl_pipe_par(Any, Par) ++ Acc;
 simpl_pipe(R2 = #route{in=N}, Acc) ->
     case {R2 == ebb_flow:id(N), Acc} of
 	{true, _} -> Acc;
@@ -95,21 +95,27 @@ simpl_pipe(Op, Acc) ->
 
 simpl_pipe_par(Par1 = #par{ops=Ops1}, Par2 = #par{ops=Ops2}) ->
     case zip_pars(Ops1, Ops2) of
-	[{Ops1, Ops2}] ->
-	    [Par2, Par1];
-	Zip ->
-	    [ebb_prim:par(
-	       lists:flatten(
-		 [ case {O1, O2} of
-		       {[_|_],  [_|_]} ->
-			   [simpl(ebb_prim:pipe(
-				    [ flatten(ebb_prim:par(X))
-				      || X <- [O1, O2] ]))];
-		       {_, _} -> [O1, O2]
-		   end
-		   || {O1, O2} <- Zip ]))]
+	[{Ops1, Ops2}] -> [Par2, Par1];
+	Zip -> [permute_pipe_par(Zip)]
+    end;
+simpl_pipe_par(Any, Par = #par{ops=Ops}) ->
+    case zip_pars([Any], Ops) of
+	[{Any, Ops}] -> [Any, Par];
+	Zip -> [permute_pipe_par(Zip)]
     end.
 
+permute_pipe_par(Zip) ->
+    ebb_prim:par(
+      lists:flatten(
+	[ case {O1, O2} of
+	      {[_|_],  [_|_]} ->
+		  [simpl(ebb_prim:pipe(
+			   [ flatten(ebb_prim:par(X))
+			     || X <- [O1, O2] ]))];
+	      {_, _} -> [O1, O2]
+	  end
+	  || {O1, O2} <- Zip ])).
+
 zip_pars(Par1, Par2) ->
     zip_pars(1, Par1, [], 1, Par2, [], []).
 
@@ -124,17 +130,21 @@ zip_pars(_, Par1,       Prev1,
 zip_pars(M, _Par1 = [Op1|Rest1], Prev1,
 	 N, _Par2 = [Op2|Rest2], Prev2,
 	 Acc) ->
-    case {M+ebb_prim:out_arity(Op1), N+ebb_prim:in_arity(Op2)} of
-	{I, I} -> 
+    case {M+ebb_prim:out_arity(Op1), N+ebb_prim:in_arity(Op2), Prev2} of
+	{_, N, []} ->
+	    zip_pars(M, [Op1|Rest1], Prev1,
+		     N, Rest2, [],
+		     [ {[], Op2} | Acc ]);
+	{I, I, _} -> 
 	    zip_pars(I, Rest1, [],
 		     I, Rest2, [],
 		     [ {lists:reverse([Op1|Prev1]), lists:reverse([Op2|Prev2])}
 		       | Acc ]);
-	{M2, N2} when M2 < N2 ->
+	{M2, N2, _} when M2 < N2 ->
 	    zip_pars(M2, Rest1, [Op1|Prev1],
 		     N, [Op2|Rest2], Prev2,
 		     Acc);
-	{M2, N2} when N2 < M2 ->
+	{M2, N2, _} when N2 < M2 ->
 	    zip_pars(M, [Op1|Rest1], Prev1,
 		     N2, Rest2, [Op2|Prev2],
 		     Acc)