Browse Source

Reorganized control flow module structure.

Paul Downen 13 years ago
parent
commit
cbd5dbb97f
6 changed files with 134 additions and 252 deletions
  1. 0 0
      include/ebb_prim.hrl
  2. 0 19
      src/ebb_loop.erl
  3. 0 41
      src/ebb_par.erl
  4. 0 24
      src/ebb_pipe.erl
  5. 134 23
      src/ebb_prim.erl
  6. 0 145
      src/flow_graph.erl

include/flow_graph.hrl → include/ebb_prim.hrl


+ 0 - 19
src/ebb_loop.erl

@@ -1,19 +0,0 @@
--module(ebb_loop).
--export([loopback/1, state/2]).
-
-loopback(Op) ->
-    N = flow_graph:in_arity(Op),
-    Block = lists:duplicate(N,0),
-    Pass = lists:seq(2,N+1),
-    flow_graph:loop(
-      ebb_prim:id(N),
-      flow_graph:pipe(
-	[Op,
-	 flow_graph:split(N+1),
-	 flow_graph:switch(
-	   flow_graph:route(N+1, [1]),
-	   [{loop, flow_graph:route(N+1, lists:append(Block,Pass))},
-	    {stop, flow_graph:route(N+1, lists:append(Pass,Block))}])])).
-
-state(Init, Op) ->
-    flow_graph:loop(Init, Op).

+ 0 - 41
src/ebb_par.erl

@@ -1,41 +0,0 @@
--module(ebb_par).
--export([scatter/2, scatter/1,
-	 parallel/2, parallel/1,
-	 fanout/2, fanout/1,
-	 funnel/2, funnel/1]).
-
-scatter(Task1, Task2) ->
-    flow_graph:parallel([Task1, Task2]).
-
-scatter([Task]) ->
-    Task;
-scatter(Tasks = [_|_]) ->
-    flow_graph:parallel(Tasks).
-
-parallel(Op1, Op2) ->
-    flow_graph:parallel([Op1, Op2]).
-
-parallel([Op]) ->
-    Op;
-parallel(Ops = [_|_]) ->
-    flow_graph:parallel(Ops).
-
-fanout(Op1, Op2) ->
-    flow_graph:pipe([flow_graph:route(1, [1, 1]),
-		     flow_graph:parallel([Op1, Op2])]).
-
-fanout([Op]) ->
-    Op;
-fanout(Ops = [_|_]) ->
-    flow_graph:pipe([flow_graph:route(1, lists:duplicate(length(Ops), 1)),
-		     flow_graph:parallel(Ops)]).
-
-funnel(Op1, Op2) ->
-    flow_graph:pipe([flow_graph:parallel([Op1, Op2]),
-		     flow_graph:funnel(2)]).
-
-funnel([Op]) ->
-    Op;
-funnel(Ops = [_|_]) ->
-    flow_graph:pipe([flow_graph:parallel(Ops),
-		     flow_graph:funnel(length(Ops))]).

+ 0 - 24
src/ebb_pipe.erl

@@ -1,24 +0,0 @@
--module(ebb_pipe).
--export([pipe/2, pipe/1, bind/2, feed/2, sequence/2, sequence/1]).
-
-pipe(Op1, Op2) ->
-    flow_graph:pipe([Op1, Op2]).
-
-pipe([Op]) ->
-    Op;
-pipe(Ops = [_|_]) ->
-    flow_graph:pipe(Ops).
-
-bind(Task, Gen) ->
-    feed(Task, ebb_prim:dynamic(Gen)).
-
-feed(Task, Op) ->
-    flow_graph:pipe([Task, Op]).
-
-sequence(Task1, Task2) ->
-    flow_graph:sequence([Task1, Task2]).
-
-sequence([Task]) ->
-    Task;
-sequence(Tasks = [_|_]) ->
-    flow_graph:sequence(Tasks).

+ 134 - 23
src/ebb_prim.erl

@@ -1,34 +1,145 @@
 -module(ebb_prim).
--export([task/2, task/1, return/1,
-	 func/1, app/2, dynamic/1,
-	 ignore/1, nop/0, id/0, id/1]).
+-include("../include/ebb_prim.hrl").
 
-task(Func, Arg) ->
-    app(dynamic(Func), return(Arg)).
+%%% Operation construction
+-export([func/1, value/1, values/1, dynamic/1, route/2,
+	 pipe/1, parallel/1, sequence/1, funnel/1,
+	 split/1, merge/1, switch/2, loop/2]).
 
-task(Thunk) ->
-    flow_graph:dynamic(Thunk).
+%%% Operation querying
+-export([in_arity/1, out_arity/1, can_connect/2]).
 
-return(Val) ->
-    flow_graph:value(Val).
+%%%-----------------------------------------------------------------------------
+%%% Operation construction
+%%%-----------------------------------------------------------------------------
 
-func(Func) ->
-    flow_graph:func(Func).
+func(F) when is_function(F) ->
+    {arity, N} = erlang:fun_info(F, arity),
+    #func{in=N, code=F}.
 
-app(Op, Arg) ->
-    flow_graph:pipe(return(Arg), Op).
 
-dynamic(Gen) ->
-    flow_graph:dynamic(Gen).
+value(X) -> #value{value=X}.
 
-ignore(Task) ->
-    flow_graph:sequence([flow_graph:route(1, []), Task]).
+values(Xs = [_|_]) ->
+    parallel(lists:map(fun value/1, Xs)).
 
-nop() ->
-    flow_graph:value('nop').
+dynamic(Gen) when is_function(Gen) ->
+    {arity, N} = erlang:fun_info(Gen, arity),
+    #dynamic{in=N, code=Gen}.
 
-id() ->
-    flow_graph:route(1, [1]).
+route(N, Map) ->
+    try
+	lists:foldl(fun(Target, Count) when Target =< N -> Count+1;
+		       (Target, _Count) when Target > N -> throw(out_of_bounds)
+		    end,
+		    0, Map)
+    of 
+	Count -> #route{in=N, out=Count, map=Map}
+    catch
+	throw:out_of_bounds -> erlang:error(badarg, [N, Map])
+    end.
 
-id(N) ->
-    flow_graph:route(N, lists:seq(1,N)).
+pipe(Ops = [First|Rest]) ->
+    try 
+	lists:foldl(fun(Op2, Op1) ->
+			       case can_connect(Op1, Op2) of
+				   true  -> Op2;
+				   false -> throw(bad_connection)
+			       end
+		    end,
+		    First, Rest)
+    of
+	Last -> #pipe{in=in_arity(First), out=out_arity(Last), ops=Ops}
+    catch
+	throw:bad_connection -> erlang:error(badarg, [Ops])
+    end.
+
+parallel(Ops = [_|_]) ->
+    {In, Out} = flatten_arity(Ops),
+    #parallel{in=In, out=Out, ops=Ops}.
+
+sequence(Ops = [_|_]) ->
+    {In, Out} = flatten_arity(Ops),
+    #sequence{in=In, out=Out, ops=Ops}.
+
+split(N) -> #split{size=N}.
+
+merge(N) -> #merge{size=N}.
+
+funnel(N) -> #funnel{size=N}.
+
+switch(Switch, Map = [{_, Op1}|Rest]) ->
+    try
+	{I, O} = lists:foldl(
+		   fun({_, Op}, {In, Out}) -> known_op_arity(Op, In, Out) end,
+		   {in_arity(Op1), out_arity(Op1)}, Rest),
+	known_op_arity(Switch, I, 1),
+	{I, O}
+    of
+	{In, Out} -> #switch{in=In, out=Out, switch=Switch, map=Map}
+    catch
+	error:function_clause -> erlang:error(badarg, [Switch, Map])
+    end.
+
+loop(Init, Op) ->
+    S = out_arity(Init),
+    {In, Out} = case {in_arity(Op)-S, out_arity(Op)-S} of
+		    {I, O} when I >= 0, O > 0 -> {I+in_arity(Init), O};
+		    {_, _} -> erlang:error(badarg, [Init, Op])
+		end,
+    #loop{in=In, out=Out, init=Init, op=Op}.
+
+%%%-----------------------------------------------------------------------------
+%%% Operation querying
+%%%-----------------------------------------------------------------------------
+
+in_arity(#func{in=In})     -> In;
+in_arity(#value{})         -> 0;
+in_arity(#dynamic{in=In})  -> In;
+in_arity(#route{in=In})    -> In;
+in_arity(#pipe{in=In})     -> In;
+in_arity(#parallel{in=In}) -> In;
+in_arity(#sequence{in=In}) -> In;
+in_arity(#split{})         -> 1;
+in_arity(#merge{size=N})   -> N;
+in_arity(#funnel{size=N})  -> N;
+in_arity(#switch{in=In})   -> In;
+in_arity(#loop{in=In})     -> In.
+
+out_arity(#func{})            -> 1;
+out_arity(#value{})           -> 1;
+out_arity(#dynamic{})         -> unknown;
+out_arity(#route{out=Out})    -> Out;
+out_arity(#pipe{out=Out})     -> Out;
+out_arity(#parallel{out=Out}) -> Out;
+out_arity(#sequence{out=Out}) -> Out;
+out_arity(#split{size=N})     -> N;
+out_arity(#merge{})           -> 1;
+out_arity(#funnel{})          -> 1;
+out_arity(#switch{out=Out})   -> Out;
+out_arity(#loop{out=Out})     -> Out.
+
+flatten_arity([First|Rest]) ->
+    lists:foldl(fun(Op, {In, Out}) ->
+			{add_arity(In, in_arity(Op)),
+			 add_arity(Out, out_arity(Op))}
+	  end,
+	  {in_arity(First), out_arity(First)}, Rest).
+
+add_arity(unknown, _) -> unknown;
+add_arity(_, unknown) -> unknown;
+add_arity(N, M)       -> N + M.
+
+known_arity(N, N)       -> N;
+known_arity(unknown, N) -> N;
+known_arity(N, unknown) -> N.
+
+known_op_arity(Op, In, Out) ->
+    {known_arity(in_arity(Op), In), known_arity(out_arity(Op), Out)}.
+
+can_connect(Op1, Op2) ->
+    case {out_arity(Op1), in_arity(Op2)} of
+	{N,       N} -> true;
+	{unknown, _} -> true;
+	_            -> false
+    end.

+ 0 - 145
src/flow_graph.erl

@@ -1,145 +0,0 @@
--module(flow_graph).
--include("../include/flow_graph.hrl").
-
-%%% Operation construction
--export([func/1, value/1, values/1, dynamic/1, route/2,
-	 pipe/1, parallel/1, sequence/1, funnel/1,
-	 split/1, merge/1, switch/2, loop/2]).
-
-%%% Operation querying
--export([in_arity/1, out_arity/1, can_connect/2]).
-
-%%%-----------------------------------------------------------------------------
-%%% Operation construction
-%%%-----------------------------------------------------------------------------
-
-func(F) when is_function(F) ->
-    {arity, N} = erlang:fun_info(F, arity),
-    #func{in=N, code=F}.
-
-
-value(X) -> #value{value=X}.
-
-values(Xs = [_|_]) ->
-    parallel(lists:map(fun value/1, Xs)).
-
-dynamic(Gen) when is_function(Gen) ->
-    {arity, N} = erlang:fun_info(Gen, arity),
-    #dynamic{in=N, code=Gen}.
-
-route(N, Map) ->
-    try
-	lists:foldl(fun(Target, Count) when Target =< N -> Count+1;
-		       (Target, _Count) when Target > N -> throw(out_of_bounds)
-		    end,
-		    0, Map)
-    of 
-	Count -> #route{in=N, out=Count, map=Map}
-    catch
-	throw:out_of_bounds -> erlang:error(badarg, [N, Map])
-    end.
-
-pipe(Ops = [First|Rest]) ->
-    try 
-	lists:foldl(fun(Op2, Op1) ->
-			       case can_connect(Op1, Op2) of
-				   true  -> Op2;
-				   false -> throw(bad_connection)
-			       end
-		    end,
-		    First, Rest)
-    of
-	Last -> #pipe{in=in_arity(First), out=out_arity(Last), ops=Ops}
-    catch
-	throw:bad_connection -> erlang:error(badarg, [Ops])
-    end.
-
-parallel(Ops = [_|_]) ->
-    {In, Out} = flatten_arity(Ops),
-    #parallel{in=In, out=Out, ops=Ops}.
-
-sequence(Ops = [_|_]) ->
-    {In, Out} = flatten_arity(Ops),
-    #sequence{in=In, out=Out, ops=Ops}.
-
-split(N) -> #split{size=N}.
-
-merge(N) -> #merge{size=N}.
-
-funnel(N) -> #funnel{size=N}.
-
-switch(Switch, Map = [{_, Op1}|Rest]) ->
-    try
-	{I, O} = lists:foldl(
-		   fun({_, Op}, {In, Out}) -> known_op_arity(Op, In, Out) end,
-		   {in_arity(Op1), out_arity(Op1)}, Rest),
-	known_op_arity(Switch, I, 1),
-	{I, O}
-    of
-	{In, Out} -> #switch{in=In, out=Out, switch=Switch, map=Map}
-    catch
-	error:function_clause -> erlang:error(badarg, [Switch, Map])
-    end.
-
-loop(Init, Op) ->
-    S = out_arity(Init),
-    {In, Out} = case {in_arity(Op)-S, out_arity(Op)-S} of
-		    {I, O} when I >= 0, O > 0 -> {I+in_arity(Init), O};
-		    {_, _} -> erlang:error(badarg, [Init, Op])
-		end,
-    #loop{in=In, out=Out, init=Init, op=Op}.
-
-%%%-----------------------------------------------------------------------------
-%%% Operation querying
-%%%-----------------------------------------------------------------------------
-
-in_arity(#func{in=In})     -> In;
-in_arity(#value{})         -> 0;
-in_arity(#dynamic{in=In})  -> In;
-in_arity(#route{in=In})    -> In;
-in_arity(#pipe{in=In})     -> In;
-in_arity(#parallel{in=In}) -> In;
-in_arity(#sequence{in=In}) -> In;
-in_arity(#split{})         -> 1;
-in_arity(#merge{size=N})   -> N;
-in_arity(#funnel{size=N})  -> N;
-in_arity(#switch{in=In})   -> In;
-in_arity(#loop{in=In})     -> In.
-
-out_arity(#func{})            -> 1;
-out_arity(#value{})           -> 1;
-out_arity(#dynamic{})         -> unknown;
-out_arity(#route{out=Out})    -> Out;
-out_arity(#pipe{out=Out})     -> Out;
-out_arity(#parallel{out=Out}) -> Out;
-out_arity(#sequence{out=Out}) -> Out;
-out_arity(#split{size=N})     -> N;
-out_arity(#merge{})           -> 1;
-out_arity(#funnel{})          -> 1;
-out_arity(#switch{out=Out})   -> Out;
-out_arity(#loop{out=Out})     -> Out.
-
-flatten_arity([First|Rest]) ->
-    lists:foldl(fun(Op, {In, Out}) ->
-			{add_arity(In, in_arity(Op)),
-			 add_arity(Out, out_arity(Op))}
-	  end,
-	  {in_arity(First), out_arity(First)}, Rest).
-
-add_arity(unknown, _) -> unknown;
-add_arity(_, unknown) -> unknown;
-add_arity(N, M)       -> N + M.
-
-known_arity(N, N)       -> N;
-known_arity(unknown, N) -> N;
-known_arity(N, unknown) -> N.
-
-known_op_arity(Op, In, Out) ->
-    {known_arity(in_arity(Op), In), known_arity(out_arity(Op), Out)}.
-
-can_connect(Op1, Op2) ->
-    case {out_arity(Op1), in_arity(Op2)} of
-	{N,       N} -> true;
-	{unknown, _} -> true;
-	_            -> false
-    end.