浏览代码

Forgot to add new files with the last commit.

Paul Downen 13 年之前
父节点
当前提交
ed3ee17dc7
共有 6 个文件被更改,包括 653 次插入0 次删除
  1. 227 0
      src/ebb_operation_fsm.erl
  2. 160 0
      src/ebb_work_manager.erl
  3. 33 0
      src/ebb_worker.erl
  4. 71 0
      src/ebb_worker_bridge.erl
  5. 46 0
      src/ebb_worker_supv.erl
  6. 116 0
      src/priority_queue.erl

+ 227 - 0
src/ebb_operation_fsm.erl

@@ -0,0 +1,227 @@
+-module(ebb_operation_fsm).
+
+-behaviour(gen_fsm).
+
+-include("../include/ebb_prim.hrl").
+
+%% API
+-export([start_link/1, start_link/2]).
+-export([send_in/2, send_in/3, return_out/1, cleanup/1]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+	 terminate/3, code_change/4]).
+
+%% state callbacks
+-export([running/2, running/3, finished/2, finished/3]).
+
+-define(DICT, orddict).
+
+-record(state, {mode, operation, out_arity, output, return_requests}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> ok,Pid} | ignore | {error,Error}
+%% Description:Creates a gen_fsm process which calls Module:init/1 to
+%% initialize. To ensure a synchronized start-up procedure, this function
+%% does not return until Module:init/1 has returned.  
+%%--------------------------------------------------------------------
+start_link(Op) ->
+    start_link(Op, local).
+
+start_link(Op, Mode) ->
+    gen_fsm:start_link(?MODULE, {Op, Mode}, []).
+
+send_in(Op, N, Arg) ->
+    ebb_event:in(Op, N, Arg).
+
+send_in(Op, Args) ->
+    lists:foreach(fun({N, Arg}) -> send_in(Op, N, Arg) end,
+		  lists:zip(lists:seq(1, length(Args)), Args)).
+
+return_out(Op) ->
+    gen_fsm:sync_send_event(Op, return_out, infinity).
+
+cleanup(Op) ->
+    gen_fsm:send_event(Op, cleanup).
+
+%%====================================================================
+%% gen_fsm callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, StateName, State} |
+%%                         {ok, StateName, State, Timeout} |
+%%                         ignore                              |
+%%                         {stop, StopReason}                   
+%% Description:Whenever a gen_fsm is started using gen_fsm:start/[3,4] or
+%% gen_fsm:start_link/3,4, this function is called by the new process to 
+%% initialize. 
+%%--------------------------------------------------------------------
+init({Op, Mode}) ->
+    State = #state{mode=Mode, out_arity=ebb_prim:out_arity(Op),
+		   output=?DICT:new(), return_requests=[]},
+    case start_operation(Op, Mode, self()) of
+	{ok, Pid} -> {ok, running, State#state{operation=Pid}};
+	ignore -> {ok, running, State};
+	Error -> Error
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% state_name(Event, State) -> {next_state, NextStateName, NextState}|
+%%                             {next_state, NextStateName, 
+%%                                NextState, Timeout} |
+%%                             {stop, Reason, NewState}
+%% Description:There should be one instance of this function for each possible
+%% state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_event/2, the instance of this function with the same name as
+%% the current state name StateName is called to handle the event. It is also 
+%% called if a timeout occurs. 
+%%--------------------------------------------------------------------
+
+running({out, N, Val},
+	State = #state{out_arity=Arity, output=Output,
+		       return_requests=Requests})
+  when N > 0, N =< Arity ->
+    Output2 = ?DICT:store(N, Val, Output),
+    State2 = State#state{output=Output2},
+    case is_done(State2) of
+	true  -> send_out_requests(Requests, Output2),
+		 {next_state, finished, State2#state{return_requests=[]}};
+	false -> {next_state, running, State2}
+    end;
+running({in, N, Arg}, State = #state{operation=Pid}) ->
+    ebb_event:in(Pid, N, Arg),
+    {next_state, running, State};
+running(cleanup, State) ->
+    {next_state, running, State}.
+
+finished(cleanup, State) ->
+    {stop, normal, State}.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% state_name(Event, From, State) -> {next_state, NextStateName, NextState} |
+%%                                   {next_state, NextStateName, 
+%%                                     NextState, Timeout} |
+%%                                   {reply, Reply, NextStateName, NextState}|
+%%                                   {reply, Reply, NextStateName, 
+%%                                    NextState, Timeout} |
+%%                                   {stop, Reason, NewState}|
+%%                                   {stop, Reason, Reply, NewState}
+%% Description: There should be one instance of this function for each
+%% possible state name. Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_event/2,3, the instance of this function with the same
+%% name as the current state name StateName is called to handle the event.
+%%--------------------------------------------------------------------
+
+running(return_out, From, State = #state{return_requests=Requests}) ->
+    State2 = State#state{return_requests=[From|Requests]},
+    {next_state, running, State2}.
+
+finished(return_out, _From, State = #state{output=Output}) ->
+    Reply = format_output(Output),
+    {reply, Reply, finished, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_event(Event, StateName, State) -> {next_state, NextStateName, 
+%%						  NextState} |
+%%                                          {next_state, NextStateName, 
+%%					          NextState, Timeout} |
+%%                                          {stop, Reason, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:send_all_state_event/2, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_sync_event(Event, From, StateName, 
+%%                   State) -> {next_state, NextStateName, NextState} |
+%%                             {next_state, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {reply, Reply, NextStateName, NextState}|
+%%                             {reply, Reply, NextStateName, NextState, 
+%%                              Timeout} |
+%%                             {stop, Reason, NewState} |
+%%                             {stop, Reason, Reply, NewState}
+%% Description: Whenever a gen_fsm receives an event sent using
+%% gen_fsm:sync_send_all_state_event/2,3, this function is called to handle
+%% the event.
+%%--------------------------------------------------------------------
+handle_sync_event(_Event, _From, StateName, State) ->
+    Reply = ok,
+    {reply, Reply, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: 
+%% handle_info(Info,StateName,State)-> {next_state, NextStateName, NextState}|
+%%                                     {next_state, NextStateName, NextState, 
+%%                                       Timeout} |
+%%                                     {stop, Reason, NewState}
+%% Description: This function is called by a gen_fsm when it receives any
+%% other message than a synchronous or asynchronous event
+%% (or a system message).
+%%--------------------------------------------------------------------
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, StateName, State) -> void()
+%% Description:This function is called by a gen_fsm when it is about
+%% to terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_fsm terminates with
+%% Reason. The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Function:
+%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+start_operation(Func = #func{}, Mode, Receiver) ->
+    ebb_func_fsm:start_link(Func, Mode, Receiver);
+start_operation(Value = #value{}, _Mode, Receiver) ->
+    ebb_value_fsm:start_link(Value, Receiver);
+start_operation(Pipe = #pipe{}, Mode, Receiver) ->
+    ebb_pipe_fsm:start_link(
+      Pipe, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver);
+start_operation(Par = #par{}, Mode, Receiver) ->
+    ebb_par_fsm:start_link(
+      Par, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver);
+start_operation(Route = #route{}, _Mode, Receiver) ->
+    ebb_route_fsm:start_link(Route, Receiver);
+start_operation(Sync = #sync{}, _Mode, Receiver) ->
+    ebb_sync_fsm:start_link(Sync, Receiver);
+start_operation(Split = #split{}, _Mode, Receiver) ->
+    ebb_split_fsm:start_link(Split, Receiver);
+start_operation(Merge = #merge{}, _Mode, Receiver) ->
+    ebb_merge_fsm:start_link(Merge, Receiver);
+start_operation(Switch = #switch{}, Mode, Receiver) ->
+    ebb_switch_fsm:start_link(
+      Switch, fun(Op, Ret) -> start_operation(Op, Mode, Ret) end, Receiver).
+
+
+is_done(#state{out_arity=Arity, output=Output}) ->
+    ?DICT:size(Output) == Arity.
+
+format_output(Output) ->
+    [ V || {_, V} <- lists:keysort(1, ?DICT:to_list(Output)) ].
+
+send_out_requests(Requests, Output) ->
+    Out = format_output(Output),
+    lists:foreach(fun(From) -> gen_fsm:reply(From, Out) end, Requests).

+ 160 - 0
src/ebb_work_manager.erl

@@ -0,0 +1,160 @@
+-module(ebb_work_manager).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0, start_link/1]).
+-export([start_work/3, work_finished/1,
+	 add_node/1, remove_node/1, scavenge/0,
+	 get_node_queue/0, get_node_list/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+	 terminate/2, code_change/3]).
+
+-define(SERVER, ebb_work_manager).
+
+-record(state, {node_queue}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link() ->
+    start_link([node()]).
+
+start_link(scavenge) ->
+    case gen_server:start_link({local, ?SERVER}, ?MODULE, [node()], []) of
+	{ok, Pid} ->
+	    scavenge(),
+	    {ok, Pid};
+	Error ->
+	    Error
+    end;
+start_link(Nodes) ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, Nodes, []).
+
+start_work(Func, Args, Receiver) ->
+    gen_server:call(?SERVER, {start_work, Func, Args, Receiver}).
+
+work_finished(Node) ->
+    gen_server:cast(?SERVER, {work_finished, Node}).
+
+add_node(Node) ->
+    gen_server:cast(?SERVER, {add_node, Node}).
+
+remove_node(Node) ->
+    gen_server:cast(?SERVER, {remove_node, Node}).
+
+scavenge() ->
+    gen_server:cast(?SERVER, scavenge).
+
+get_node_queue() ->
+    gen_server:call(?SERVER, get_node_queue).
+
+get_node_list() ->
+    priority_queue:to_list(get_node_queue()).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+    Queue = priority_queue:empty(),
+    {ok, #state{node_queue=Queue}};
+init(Nodes) ->
+    Queue = priority_queue:from_orddict([{0, Nodes}]),
+    {ok, #state{node_queue=Queue}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({start_work, Func, Args, Receiver}, _From,
+	    State = #state{node_queue=Queue}) ->
+    {Reply, Queue2} = start_work_from_queue(Func, Args, Receiver, Queue),
+    State2 = State#state{node_queue=Queue2},
+    {reply, Reply, State2};
+handle_call(get_node_queue, _From, State = #state{node_queue=Queue}) ->
+    Reply = Queue,
+    {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({work_finished, Node}, State = #state{node_queue=Queue}) ->
+    Queue2 = priority_queue:modify_priority(Node, fun(P) -> P-1 end, Queue),
+    State2 = State#state{node_queue=Queue2},
+    {noreply, State2};
+handle_cast({add_node, Node}, State = #state{node_queue=Queue}) ->
+    Queue2 = priority_queue:insert(0, Node, Queue),
+    State2 = State#state{node_queue=Queue2},
+    {noreply, State2};
+handle_cast({remove_node, Node}, State = #state{node_queue=Queue}) ->
+    Queue2 = priority_queue:delete_value(Node, Queue),
+    State2 = State#state{node_queue=Queue2},
+    {noreply, State2};
+handle_cast(scavenge, State = #state{node_queue=Queue}) ->
+    % Scavenge nodes here
+    Queue2 = Queue,
+    State2 = State#state{node_queue=Queue2},
+    {noreply, State2}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+start_work_from_queue(Func, Args, Receiver, Queue) ->
+    {Priority, Node, Queue2} = priority_queue:take_minimum(Queue),
+    Reply = ebb_worker_bridge:start_link(Node, Func, Args, Receiver),
+    case Reply of
+	{ok, _} -> 
+	    Queue3 = priority_queue:insert(Priority+1, Node, Queue2),
+	    {Reply, Queue3};
+	_Error ->
+	    start_work_from_queue(Func, Args, Receiver, Queue2)
+    end.

+ 33 - 0
src/ebb_worker.erl

@@ -0,0 +1,33 @@
+-module(ebb_worker).
+
+-export([start_link/3, start_link/4, stop/1]).
+
+-export([worker/3, worker/4]).
+
+start_link(Func, Args, Receiver) ->
+    try
+	spawn_link(?MODULE, worker, [Func, Args, Receiver])
+    of
+	Pid -> {ok, Pid}
+    catch
+	Error -> Error
+    end.
+
+start_link(Node, Func, Args, Receiver) ->
+    try
+	spawn_link(Node, ?MODULE, worker, [Node, Func, Args, Receiver])
+    of
+	Pid -> {ok, Pid}
+    catch
+	Error -> Error
+    end.
+
+worker(Func, Args, Receiver) ->
+    ebb_event:return(Receiver, apply(Func, Args)).
+
+worker(Node, Func, Args, Receiver) ->
+    ebb_event:return(Receiver, apply(Func, Args)),
+    ebb_work_manager:work_finished(Node).
+
+stop(Worker) ->
+    exit(Worker, kill).

+ 71 - 0
src/ebb_worker_bridge.erl

@@ -0,0 +1,71 @@
+-module(ebb_worker_bridge).
+
+-behaviour(supervisor_bridge).
+
+%% API
+-export([start_link/3, start_link/4]).
+
+%% supervisor_bridge callbacks
+-export([init/1, terminate/2]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {mode, worker}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the supervisor bridge
+%%--------------------------------------------------------------------
+start_link(Func, Args, Receiver) ->
+    supervisor_bridge:start_link(?MODULE, {Func, Args, Receiver}).
+
+start_link(Node, Func, Args, Receiver) ->
+    supervisor_bridge:start_link(?MODULE, {Node, Func, Args, Receiver}).
+
+%%====================================================================
+%% supervisor_bridge callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Funcion: init(Args) -> {ok,  Pid, State} |
+%%                        ignore            |
+%%                        {error, Reason}    
+%% Description:Creates a supervisor_bridge process, linked to the calling
+%% process, which calls Module:init/1 to start the subsystem. To ensure a
+%% synchronized start-up procedure, this function does not return until
+%% Module:init/1 has returned. 
+%%--------------------------------------------------------------------
+init({Func, Args, Receiver}) ->
+    case ebb_worker:start_link(Func, Args, Receiver) of
+	{ok, Pid} ->
+	    {ok, Pid, #state{mode=local, worker=Pid}};
+	Error ->
+	    Error
+    end;
+init({Node, Func, Args, Receiver}) ->
+    case ebb_worker:start_link(Node, Func, Args, Receiver) of
+	{ok, Pid} ->
+	    {ok, Pid, #state{mode={distributed, Node}, worker=Pid}};
+	Error ->
+	    Error
+    end.
+
+%%--------------------------------------------------------------------
+%% Func: terminate(Reason, State) -> void()
+%% Description:This function is called by the supervisor_bridge when it is
+%% about to terminate. It should be the opposite of Module:init/1 and stop
+%% the subsystem and do any necessary cleaning up.The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(normal, _State) ->
+    ok;
+terminate(noconnection, #state{mode={distributed, Node}}) ->
+    ebb_work_manager:remove_node(Node),
+    exit(noconnection);
+terminate(Reason, _State) ->
+    exit(Reason).
+
+%%====================================================================
+%% Internal functions
+%%====================================================================

+ 46 - 0
src/ebb_worker_supv.erl

@@ -0,0 +1,46 @@
+-module(ebb_worker_supv).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/4]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%%====================================================================
+%% API functions
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the supervisor
+%%--------------------------------------------------------------------
+start_link(Mode, Func, Args, Receiver) ->
+    supervisor:start_link(?MODULE, {Mode, Func, Args, Receiver}).
+
+%%====================================================================
+%% Supervisor callbacks
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Func: init(Args) -> {ok,  {SupFlags,  [ChildSpec]}} |
+%%                     ignore                          |
+%%                     {error, Reason}
+%% Description: Whenever a supervisor is started using 
+%% supervisor:start_link/[2,3], this function is called by the new process 
+%% to find out about restart strategy, maximum restart frequency and child 
+%% specifications.
+%%--------------------------------------------------------------------
+init({local, Func, Args, Receiver}) ->
+    Worker = {'worker',
+	      {ebb_worker_bridge, start_link, [Func, Args, Receiver]},
+	      transient, 2000, supervisor, [ebb_worker_bridge]},
+    {ok, {{one_for_one, 5, 1}, [Worker]}};
+init({distributed, Func, Args, Receiver}) ->
+    Worker = {'worker',
+	      {ebb_work_manager, start_work, [Func, Args, Receiver]},
+	      transient, 2000, supervisor, [ebb_work_manager]},
+    {ok, {{one_for_one, 5, 1}, [Worker]}}.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================

+ 116 - 0
src/priority_queue.erl

@@ -0,0 +1,116 @@
+-module(priority_queue).
+
+-export([empty/0, is_empty/1, size/1, values/1, priorities/1]).
+-export([to_list/1, to_orddict/1, from_list/1, from_orddict/1, map_values/2]).
+-export([insert/3, insert_many/3]).
+-export([find_value/2, delete_value/2, take_value/2, modify_priority/3]).
+-export([minimum/1, take_minimum/1, maximum/1, take_maximum/1]).
+
+empty() ->
+    gb_trees:empty().
+
+is_empty(Queue) ->
+    gb_trees:is_empty(Queue).
+
+size(Queue) ->
+    gb_trees:size(Queue).
+
+values(Queue) ->
+    lists:append(gb_trees:values(Queue)).
+
+priorities(Queue) ->
+    gb_trees:keys(Queue).
+
+to_orddict(Queue) ->
+    gb_trees:to_list(Queue).
+
+to_list(Queue) ->
+    lists:append([ [ {Priority, Val} || Val <- Vals ]
+		   || {Priority, Vals} <- gb_trees:to_list(Queue) ]).
+
+from_orddict(List) ->
+    gb_trees:from_orddict(List).
+
+from_list(List) ->
+    lists:foldl(fun({Priority, Val}, Queue) -> insert(Priority, Val, Queue) end,
+		empty(), List).
+
+map_values(Func, Queue) ->
+    gb_trees:map(fun(_Key, Vals) -> lists:map(Func, Vals) end, Queue).
+
+insert(Priority, Val, Queue) ->
+    case gb_trees:lookup(Priority, Queue) of
+	none ->
+	    gb_trees:insert(Priority, [Val], Queue);
+	{value, Prev} ->
+	    gb_trees:update(Priority, [Val|Prev], Queue)
+    end.
+
+insert_many(Priority, Vals, Queue) when is_list(Vals) ->
+    case gb_trees:lookup(Priority, Queue) of
+	none ->
+	    gb_trees:insert(Priority, Vals, Queue);
+	{value, Prev} ->
+	    gb_trees:update(Priority, Vals++Prev, Queue)
+    end.
+
+find_value(Val, Queue) ->
+    case [ {P, V} || {P, Vs} <- to_orddict(Queue), V <- Vs, V == Val ] of
+	[{Priority, Val}] -> {Priority, Val};
+	[] -> none
+    end.
+
+delete_value(Val, Queue) ->
+    map_values(fun(Vs) -> [ V || V <- Vs, V =/= Val ] end, Queue).
+
+take_value(Val, Queue) ->
+    case [ {P, V, Vs} || {P, Vs} <- to_orddict(Queue), V <- Vs, V == Val ] of
+	[{Priority, Val, Vals}] ->
+	    Queue2 = case [ V || V <- Vals, V =/= Val ] of
+			 [] -> gb_trees:delete(Priority, Queue);
+			 Vals2 -> gb_trees:update(Priority, Vals2, Queue)
+		     end,
+	    {Priority, Val, Queue2};
+	[] -> none
+    end.
+
+modify_priority(Val, Func, Queue) when is_function(Func) ->
+    case take_value(Val, Queue) of
+	{Priority, Val, Queue2} ->
+	    insert(Func(Priority), Val, Queue2);
+	none ->
+	    Queue
+    end.
+
+minimum(Queue) ->
+    case gb_trees:smallest(Queue) of
+	{Priority, [Val|_]} -> {Priority, Val};
+	{Priority, []} -> minimum(gb_trees:delete(Priority, Queue))
+    end.
+
+maximum(Queue) ->
+    case gb_trees:largest(Queue) of
+	{Priority, [Val|_]} -> {Priority, Val};
+	{Priority, []} -> maximum(gb_trees:delete(Priority, Queue))
+    end.
+
+take_minimum(Queue) ->
+    case gb_trees:take_smallest(Queue) of
+	{_Priority, [], Queue2} ->
+	    take_minimum(Queue2);
+	{Priority, [Val], Queue2} ->
+	    {Priority, Val, Queue2};
+	{Priority, [Val|Vals], Queue2} ->
+	    {Priority, Val, gb_trees:insert(Priority, Vals, Queue2)}
+    end.
+
+take_maximum(Queue) ->
+    case gb_trees:take_largest(Queue) of
+	{_Priority, [], Queue2} ->
+	    take_maximum(Queue2);
+	{Priority, [Val], Queue2} ->
+	    {Priority, Val, Queue2};
+	{Priority, [Val|Vals], Queue2} ->
+	    {Priority, Val, gb_trees:insert(Priority, Vals, Queue2)}
+    end.
+