Source code for orkes.services.schema
from typing import Optional, Dict, AsyncGenerator, Any, List, Union
from abc import ABC, abstractmethod
from requests import Response
from pydantic import BaseModel
from orkes.shared.schema import OrkesMessagesSchema, OrkesToolSchema, RequestSchema
[docs]
class LLMProviderStrategy(ABC):
"""Abstract base class for LLM provider strategies.
This class defines the interface for handling provider-specific logic, such as
preparing payloads, parsing responses, and generating headers. Each provider
(e.g., OpenAI, Anthropic) should have its own implementation of this class.
"""
@abstractmethod
def prepare_payload(self, model: str, messages: OrkesMessagesSchema, stream: bool, settings: Dict, tools: Optional[List[Dict]] = None) -> Dict:
"""Prepares the payload for a request to the LLM provider.
Args:
model (str): The name of the model to use.
messages (OrkesMessagesSchema): The messages to send to the LLM.
stream (bool): Whether to stream the response.
settings (Dict): A dictionary of settings for the request.
tools (Optional[List[Dict]], optional): A list of tools to provide to the
LLM. Defaults to None.
Returns:
Dict: The prepared payload.
"""
pass
@abstractmethod
def parse_response(self, response_data: Dict) -> RequestSchema:
"""Parses a response from the LLM provider.
Args:
response_data (Dict): The response data from the provider.
Returns:
RequestSchema: The parsed response.
"""
pass
@abstractmethod
def parse_stream_chunk(self, chunk: str) -> Optional[str]:
"""Parses a single chunk of a streaming response.
Args:
chunk (str): A chunk of the response.
Returns:
Optional[str]: The parsed content of the chunk, or None if the chunk is
empty or a stop token.
"""
pass
@abstractmethod
def get_headers(self, api_key: str) -> Dict[str, str]:
"""Gets the authentication headers for the LLM provider.
Args:
api_key (str): The API key for the provider.
Returns:
Dict[str, str]: A dictionary of headers.
"""
pass
@abstractmethod
def get_messages_payload(self, messages: OrkesMessagesSchema) -> List[Dict]:
"""Converts an Orkes message schema to a format suitable for the LLM provider.
Args:
messages (OrkesMessagesSchema): The messages to convert.
Returns:
List[Dict]: The messages in the provider's format.
"""
pass
@abstractmethod
def get_tools_payload(self, tools: List[OrkesToolSchema]) -> List[Dict]:
"""Converts an Orkes tool schema to a format suitable for the LLM provider.
Args:
tools (List[OrkesToolSchema]): The tools to convert.
Returns:
List[Dict]: The tools in the provider's format.
"""
pass
[docs]
class LLMInterface(ABC):
"""Abstract base class for LLM connections.
This class defines the interface for sending and streaming messages to an LLM,
as well as for performing health checks.
"""
@abstractmethod
def send_message(self, message, **kwargs) -> Response:
"""Sends a message to the LLM and receives the full response.
Args:
message: The message to send.
**kwargs: Additional keyword arguments.
Returns:
Response: The response from the LLM.
"""
pass
@abstractmethod
async def stream_message(self, message, **kwargs) -> AsyncGenerator[str, None]:
"""Streams the response from the LLM incrementally.
Args:
message: The message to send.
**kwargs: Additional keyword arguments.
Yields:
str: A chunk of the response.
"""
pass
@abstractmethod
def health_check(self) -> Response:
"""Checks the health status of the LLM server.
Returns:
Response: The response from the health check.
"""
pass