Advanced Control Flow#
This guide explains how to use conditional edges to create branching logic in your Orkes graphs. This is a powerful feature that allows you to build complex, agent-like behaviors.
1. The Gate Function#
Conditional edges work by using a “gate function”. This is a simple Python function that takes the current state and returns a string. The string returned by the gate function determines which path the graph execution will take.
from typing import TypedDict
class NumberState(TypedDict):
number: int
def is_even_or_odd(state: NumberState) -> str:
if state['number'] % 2 == 0:
return 'even'
else:
return 'odd'
2. The Conditional Edge#
You create a conditional edge using the add_conditional_edge method. This method takes four arguments:
- The name of the source node.
- The gate function.
- A dictionary that maps the possible return values of the gate function to the names of the destination nodes.
- (Optional) A default destination node, if none of the keys in the dictionary match the return value of the gate function.
graph TD
subgraph Conditional Edge
A[set_number] --> B{is_even_or_odd};
B -- "returns 'even'" --> C[process_even];
B -- "returns 'odd'" --> D[process_odd];
end
from orkes.graph.core import OrkesGraph
# ... (NumberState and is_even_or_odd definition)
graph = OrkesGraph(NumberState)
# Assume 'process_even' and 'process_odd' are nodes you have already defined
# graph.add_node('process_even', process_even_node)
# graph.add_node('process_odd', process_odd_node)
# Add a node that sets the number
def set_number_node(state: NumberState) -> NumberState:
state['number'] = 10
return state
graph.add_node('set_number', set_number_node)
# Add the conditional edge
graph.add_conditional_edge(
'set_number',
is_even_or_odd,
{
'even': 'process_even',
'odd': 'process_odd'
}
)
3. Looping#
You can create loops by routing a conditional edge back to a previous node in the graph. Orkes has a built-in mechanism to prevent infinite loops. The GraphRunner has a max_passes parameter (defaulting to 10) that will stop the execution if the graph runs for too many steps.
graph TD
subgraph Looping
A[increment_node] --> B{counter_gate};
B -- "returns 'continue'" --> A;
B -- "returns 'finish'" --> C[END];
end
# Example of a loop
# ... (graph and other nodes)
def counter_gate(state: CounterState) -> str:
if state['count'] < 5:
return 'continue'
else:
return 'finish'
graph.add_conditional_edge(
'increment_node',
counter_gate,
{
'continue': 'increment_node', # Edge back to the same node
'finish': graph.END
}
)
4. Parallel Execution#
Orkes supports parallel execution of different branches in the graph, a pattern also known as a fan-out/fan-in strategy.
Fan-Out: The graph “fans out” from a single node to execute multiple branches concurrently.
Fan-In: The parallel branches “fan in” to a single aggregation node, ensuring the main execution path only continues after all parallel work is complete.
graph TD
subgraph Fan-Out
A(start_node) --> B(branch_1);
A --> C(branch_2);
A --> E(...);
end
subgraph Fan-In
B --> D[aggregation_node];
C --> D;
E --> D;
end
You can implement this using the add_parallel_edges method, which splits the execution into multiple paths. All parallel branches must eventually converge into a single aggregation_node.
from orkes.graph.core import OrkesGraph
from typing import TypedDict
class ParallelState(TypedDict):
branch_1_visited: bool
branch_2_visited: bool
def branch_1_node(state: ParallelState) -> ParallelState:
state['branch_1_visited'] = True
return state
def branch_2_node(state: ParallelState) -> ParallelState:
state['branch_2_visited'] = True
return state
def aggregation_node(state: ParallelState) -> ParallelState:
# This node will only be reached after both branches complete
assert state['branch_1_visited']
assert state['branch_2_visited']
return state
graph = OrkesGraph(ParallelState)
graph.add_node('branch_1', branch_1_node)
graph.add_node('branch_2', branch_2_node)
graph.add_node('aggregator', aggregation_node)
graph.add_parallel_edges(
graph.START,
to_nodes=['branch_1', 'branch_2'],
aggregation_node='aggregator'
)
# Edges from parallel branches to the aggregation node
graph.add_edge('branch_1', 'aggregator')
graph.add_edge('branch_2', 'aggregator')
# Edge from the aggregation node to the end
graph.add_edge('aggregator', graph.END)