Advanced Control Flow#

This guide explains how to use conditional edges to create branching logic in your Orkes graphs. This is a powerful feature that allows you to build complex, agent-like behaviors.

1. The Gate Function#

Conditional edges work by using a “gate function”. This is a simple Python function that takes the current state and returns a string. The string returned by the gate function determines which path the graph execution will take.

from typing import TypedDict

class NumberState(TypedDict):
    number: int

def is_even_or_odd(state: NumberState) -> str:
    if state['number'] % 2 == 0:
        return 'even'
    else:
        return 'odd'

2. The Conditional Edge#

You create a conditional edge using the add_conditional_edge method. This method takes four arguments: - The name of the source node. - The gate function. - A dictionary that maps the possible return values of the gate function to the names of the destination nodes. - (Optional) A default destination node, if none of the keys in the dictionary match the return value of the gate function.

        graph TD
    subgraph Conditional Edge
        A[set_number] --> B{is_even_or_odd};
        B -- "returns 'even'" --> C[process_even];
        B -- "returns 'odd'" --> D[process_odd];
    end
    
from orkes.graph.core import OrkesGraph
# ... (NumberState and is_even_or_odd definition)

graph = OrkesGraph(NumberState)

# Assume 'process_even' and 'process_odd' are nodes you have already defined
# graph.add_node('process_even', process_even_node)
# graph.add_node('process_odd', process_odd_node)

# Add a node that sets the number
def set_number_node(state: NumberState) -> NumberState:
    state['number'] = 10
    return state
graph.add_node('set_number', set_number_node)


# Add the conditional edge
graph.add_conditional_edge(
    'set_number',
    is_even_or_odd,
    {
        'even': 'process_even',
        'odd': 'process_odd'
    }
)

3. Looping#

You can create loops by routing a conditional edge back to a previous node in the graph. Orkes has a built-in mechanism to prevent infinite loops. The GraphRunner has a max_passes parameter (defaulting to 10) that will stop the execution if the graph runs for too many steps.

        graph TD
    subgraph Looping
        A[increment_node] --> B{counter_gate};
        B -- "returns 'continue'" --> A;
        B -- "returns 'finish'" --> C[END];
    end
    
# Example of a loop
# ... (graph and other nodes)

def counter_gate(state: CounterState) -> str:
    if state['count'] < 5:
        return 'continue'
    else:
        return 'finish'

graph.add_conditional_edge(
    'increment_node',
    counter_gate,
    {
        'continue': 'increment_node', # Edge back to the same node
        'finish': graph.END
    }
)

4. Parallel Execution#

Orkes supports parallel execution of different branches in the graph, a pattern also known as a fan-out/fan-in strategy.

  • Fan-Out: The graph “fans out” from a single node to execute multiple branches concurrently.

  • Fan-In: The parallel branches “fan in” to a single aggregation node, ensuring the main execution path only continues after all parallel work is complete.

        graph TD
    subgraph Fan-Out
        A(start_node) --> B(branch_1);
        A --> C(branch_2);
        A --> E(...);
    end
    subgraph Fan-In
        B --> D[aggregation_node];
        C --> D;
        E --> D;
    end
    

You can implement this using the add_parallel_edges method, which splits the execution into multiple paths. All parallel branches must eventually converge into a single aggregation_node.

from orkes.graph.core import OrkesGraph
from typing import TypedDict

class ParallelState(TypedDict):
    branch_1_visited: bool
    branch_2_visited: bool

def branch_1_node(state: ParallelState) -> ParallelState:
    state['branch_1_visited'] = True
    return state

def branch_2_node(state: ParallelState) -> ParallelState:
    state['branch_2_visited'] = True
    return state

def aggregation_node(state: ParallelState) -> ParallelState:
    # This node will only be reached after both branches complete
    assert state['branch_1_visited']
    assert state['branch_2_visited']
    return state

graph = OrkesGraph(ParallelState)

graph.add_node('branch_1', branch_1_node)
graph.add_node('branch_2', branch_2_node)
graph.add_node('aggregator', aggregation_node)

graph.add_parallel_edges(
    graph.START,
    to_nodes=['branch_1', 'branch_2'],
    aggregation_node='aggregator'
)

# Edges from parallel branches to the aggregation node
graph.add_edge('branch_1', 'aggregator')
graph.add_edge('branch_2', 'aggregator')

# Edge from the aggregation node to the end
graph.add_edge('aggregator', graph.END)