Source code for orkes.agents.schema

from abc import ABC, abstractmethod
from typing import Union, Dict, Any, AsyncIterator, Iterator
from orkes.services.schema import LLMInterface
import uuid

[docs] class AgentInterface(ABC): """Abstract base class for all agents. This interface defines the common methods that all agents should implement. Agents are responsible for processing input queries and returning results, either synchronously or asynchronously, as a single response or a stream. """ @abstractmethod def invoke(self, queries: Union[str, Dict[str, Any]]) -> Any: """Invoke the agent with a message. Args: queries (Union[str, Dict[str, Any]]): The input to the agent. Returns: Any: The agent's response. """ pass @abstractmethod async def ainvoke(self, queries: Union[str, Dict[str, Any]]) -> Any: """Asynchronously invoke the agent with a message. Args: queries (Union[str, Dict[str, Any]]): The input to the agent. Returns: Any: The agent's response. """ pass @abstractmethod def stream(self, queries: Union[str, Dict[str, Any]]) -> Iterator[Any]: """Stream the agent's response synchronously. Args: queries (Union[str, Dict[str, Any]]): The input to the agent. Returns: Iterator[Any]: An iterator of the agent's response. """ pass @abstractmethod async def astream(self, queries: Union[str, Dict[str, Any]]) -> AsyncIterator[Any]: """Stream the agent's response asynchronously. Args: queries (Union[str, Dict[str, Any]]): The input to the agent. Returns: AsyncIterator[Any]: An async iterator of the agent's response. """ pass
[docs] class Agent(AgentInterface): """Base class for agents. This class provides a default implementation of the `AgentInterface`. It is intended to be subclassed by specific agent implementations. Attributes: name (str): The name of the agent. llm_interface (LLMInterface): The LLM interface to use. """
[docs] def __init__(self, name: str, llm: LLMInterface): """Initializes an Agent. Args: name (str): The name of the agent. llm (LLMInterface): The LLM interface to use. """ self.name = name self.llm_interface = llm
def invoke(self, queries: Union[str, Dict[str, Any]]) -> Any: raise NotImplementedError async def ainvoke(self, queries: Union[str, Dict[str, Any]]) -> Any: raise NotImplementedError def stream(self, queries: Union[str, Dict[str, Any]]) -> Iterator[Any]: raise NotImplementedError async def astream(self, queries: Union[str, Dict[str, Any]]) -> AsyncIterator[Any]: raise NotImplementedError