Source code for borgllm.openai

"""
OpenAI client integration for BorgLLM.

This module provides BorgOpenAI and BorgAsyncOpenAI clients that are drop-in
replacements for openai.OpenAI and openai.AsyncOpenAI, with automatic:
- Provider resolution from model IDs (e.g., "openai:gpt-4o", "google:gemini-2.5-flash")
- Rate limit handling with automatic retry
- API key rotation
- Virtual provider support
- Cooldown management
"""

import asyncio
import time
import logging
from typing import Any, Dict, Iterator, AsyncIterator, Optional, Union, List, Literal, overload

from openai import OpenAI, AsyncOpenAI, RateLimitError
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from openai.types.responses import Response
from openai._streaming import Stream, AsyncStream

from .borgllm import BorgLLM, LLMProviderConfig
from .core import RateLimitHandler, ConfigResolver

logger = logging.getLogger(__name__)


class BorgChatCompletions:
    """
    Proxy for chat.completions that handles BorgLLM provider resolution.
    
    Duck-types as openai.resources.chat.Completions.
    """
    
    def __init__(self, borg_client: "BorgOpenAI"):
        self._borg_client = borg_client
    
    @overload
    def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: Literal[True],
        **kwargs,
    ) -> Stream[ChatCompletionChunk]:
        ...
    
    @overload
    def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: Literal[False] = False,
        **kwargs,
    ) -> ChatCompletion:
        ...
    
    @overload
    def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: bool = False,
        **kwargs,
    ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
        ...
    
    def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: bool = False,
        **kwargs,
    ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
        """
        Create a chat completion with automatic provider resolution.
        
        Args:
            messages: List of message dicts with 'role' and 'content'
            model: Model identifier in format "provider:model" (e.g., "openai:gpt-4o")
            stream: Whether to stream the response
            **kwargs: Additional arguments passed to the OpenAI API
            
        Returns:
            ChatCompletion or Stream[ChatCompletionChunk] if streaming
        """
        return self._borg_client._create_chat_completion(
            messages=messages,
            model=model,
            stream=stream,
            **kwargs,
        )


class BorgAsyncChatCompletions:
    """
    Async proxy for chat.completions that handles BorgLLM provider resolution.
    
    Duck-types as openai.resources.chat.AsyncCompletions.
    """
    
    def __init__(self, borg_client: "BorgAsyncOpenAI"):
        self._borg_client = borg_client
    
    @overload
    async def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: Literal[True],
        **kwargs,
    ) -> AsyncStream[ChatCompletionChunk]:
        ...
    
    @overload
    async def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: Literal[False] = False,
        **kwargs,
    ) -> ChatCompletion:
        ...
    
    @overload
    async def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: bool = False,
        **kwargs,
    ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
        ...
    
    async def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str,
        stream: bool = False,
        **kwargs,
    ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
        """
        Create a chat completion asynchronously with automatic provider resolution.
        
        Args:
            messages: List of message dicts with 'role' and 'content'
            model: Model identifier in format "provider:model" (e.g., "openai:gpt-4o")
            stream: Whether to stream the response
            **kwargs: Additional arguments passed to the OpenAI API
            
        Returns:
            ChatCompletion or AsyncStream[ChatCompletionChunk] if streaming
        """
        return await self._borg_client._create_chat_completion(
            messages=messages,
            model=model,
            stream=stream,
            **kwargs,
        )


class BorgChat:
    """
    Proxy for chat namespace.
    
    Duck-types as openai.resources.Chat.
    """
    
    def __init__(self, borg_client: "BorgOpenAI"):
        self._borg_client = borg_client
        self._completions: Optional[BorgChatCompletions] = None
    
    @property
    def completions(self) -> BorgChatCompletions:
        if self._completions is None:
            self._completions = BorgChatCompletions(self._borg_client)
        return self._completions


class BorgAsyncChat:
    """
    Async proxy for chat namespace.
    
    Duck-types as openai.resources.AsyncChat.
    """
    
    def __init__(self, borg_client: "BorgAsyncOpenAI"):
        self._borg_client = borg_client
        self._completions: Optional[BorgAsyncChatCompletions] = None
    
    @property
    def completions(self) -> BorgAsyncChatCompletions:
        if self._completions is None:
            self._completions = BorgAsyncChatCompletions(self._borg_client)
        return self._completions


class BorgResponses:
    """
    Proxy for responses API that handles BorgLLM provider resolution.
    
    Duck-types as openai.resources.Responses.
    """
    
    def __init__(self, borg_client: "BorgOpenAI"):
        self._borg_client = borg_client
    
    @overload
    def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: Literal[True],
        **kwargs,
    ) -> Stream[Response]:
        ...
    
    @overload
    def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: Literal[False] = False,
        **kwargs,
    ) -> Response:
        ...
    
    @overload
    def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: bool = False,
        **kwargs,
    ) -> Union[Response, Stream[Response]]:
        ...
    
    def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: bool = False,
        **kwargs,
    ) -> Union[Response, Stream[Response]]:
        """
        Create a response with automatic provider resolution.
        
        Args:
            model: Model identifier in format "provider:model" (e.g., "openai:gpt-4o")
            input: The input text or messages
            stream: Whether to stream the response
            **kwargs: Additional arguments passed to the OpenAI API
            
        Returns:
            Response or Stream[Response] if streaming
        """
        return self._borg_client._create_response(
            model=model,
            input=input,
            stream=stream,
            **kwargs,
        )


class BorgAsyncResponses:
    """
    Async proxy for responses API that handles BorgLLM provider resolution.
    
    Duck-types as openai.resources.AsyncResponses.
    """
    
    def __init__(self, borg_client: "BorgAsyncOpenAI"):
        self._borg_client = borg_client
    
    @overload
    async def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: Literal[True],
        **kwargs,
    ) -> AsyncStream[Response]:
        ...
    
    @overload
    async def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: Literal[False] = False,
        **kwargs,
    ) -> Response:
        ...
    
    @overload
    async def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: bool = False,
        **kwargs,
    ) -> Union[Response, AsyncStream[Response]]:
        ...
    
    async def create(
        self,
        *,
        model: str,
        input: Union[str, List[Dict[str, Any]]],
        stream: bool = False,
        **kwargs,
    ) -> Union[Response, AsyncStream[Response]]:
        """
        Create a response asynchronously with automatic provider resolution.
        
        Args:
            model: Model identifier in format "provider:model" (e.g., "openai:gpt-4o")
            input: The input text or messages
            stream: Whether to stream the response
            **kwargs: Additional arguments passed to the OpenAI API
            
        Returns:
            Response or AsyncStream[Response] if streaming
        """
        return await self._borg_client._create_response(
            model=model,
            input=input,
            stream=stream,
            **kwargs,
        )


[docs] class BorgOpenAI: """ Drop-in replacement for openai.OpenAI with BorgLLM integration. Automatically handles: - Provider resolution from model IDs (e.g., "openai:gpt-4o") - Rate limit detection and retry - API key rotation - Virtual provider support - Cooldown management Example:: from borgllm import BorgOpenAI client = BorgOpenAI() response = client.chat.completions.create( model="openai:gpt-4o", messages=[{"role": "user", "content": "Hello!"}] ) """ def __init__( self, config_file: str = "borg.yaml", initial_config_data: Optional[Dict[str, Any]] = None, overrides: Optional[Dict[str, Any]] = None, cooldown: Optional[Union[int, Dict[str, int]]] = None, timeout: Optional[Union[float, Dict[str, float]]] = None, max_retries: int = 10, **kwargs, ): """ Initialize the BorgOpenAI client. Args: config_file: Path to the BorgLLM configuration file initial_config_data: Optional initial configuration data as dictionary overrides: Optional dictionary of settings to override cooldown: Optional cooldown configuration (int or dict) timeout: Optional timeout configuration (float or dict) max_retries: Maximum number of retries on rate limit errors **kwargs: Additional arguments (reserved for future use) """ self._borgllm_config = BorgLLM.get_instance( config_path=config_file, initial_config_data=initial_config_data, ) if cooldown is not None: self._borgllm_config.set_cooldown_config(cooldown) if timeout is not None: self._borgllm_config.set_timeout_config(timeout) self._overrides = overrides or {} self._max_retries = max_retries self._retry_delay = 0.1 # Cache for OpenAI clients per provider self._clients: Dict[str, OpenAI] = {} # Current request state self._current_provider_name: Optional[str] = None self._current_provider_config: Optional[LLMProviderConfig] = None # Lazy-initialized proxies self._chat: Optional[BorgChat] = None self._responses: Optional[BorgResponses] = None @property def chat(self) -> BorgChat: """Access the chat completions API.""" if self._chat is None: self._chat = BorgChat(self) return self._chat @property def responses(self) -> BorgResponses: """Access the responses API.""" if self._responses is None: self._responses = BorgResponses(self) return self._responses def _get_or_create_client(self, provider_config: LLMProviderConfig) -> OpenAI: """ Get or create an OpenAI client for the given provider config. Args: provider_config: The resolved provider configuration Returns: Configured OpenAI client """ # Create a new client with current config (API key may have rotated) client = OpenAI( api_key=provider_config.api_key, base_url=str(provider_config.base_url), ) return client def _resolve_provider(self, model_id: str) -> LLMProviderConfig: """ Resolve provider configuration from a model ID. Args: model_id: Model identifier (e.g., "openai:gpt-4o") Returns: Resolved LLMProviderConfig """ provider_config = self._borgllm_config.get( model_id, timeout=30, allow_await_cooldown=True, ) # Apply overrides if self._overrides: for key, value in self._overrides.items(): if hasattr(provider_config, key): setattr(provider_config, key, value) self._current_provider_name = provider_config.name self._current_provider_config = provider_config return provider_config def _handle_rate_limit(self, error: Exception, retry_count: int) -> int: """Handle rate limit error and return updated retry count.""" provider_name = self._current_provider_name or "unknown" logger.warning(f"Rate limit error for provider {provider_name}: {error}") self._borgllm_config.signal_429(provider_name) retry_count += 1 if retry_count >= self._max_retries: logger.error(f"Max retries ({self._max_retries}) reached for provider {provider_name}") raise return retry_count def _log_error(self, error: Exception) -> None: """Log detailed error information.""" provider_name = self._current_provider_name or "unknown" logger.error(f"Non-rate-limit error for provider {provider_name}: {error}") logger.error("--------------------------------") logger.error("Config Debug Info:") if self._current_provider_config: logger.error(f" base_url: {self._current_provider_config.base_url}") logger.error(f" model: {self._current_provider_config.model}") logger.error(f" provider_name: {provider_name}") logger.error("--------------------------------") def _create_chat_completion( self, *, messages: List[Dict[str, Any]], model: str, stream: bool = False, **kwargs, ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: """ Internal method to create a chat completion with retry logic. """ retry_count = 0 while retry_count < self._max_retries: try: # Resolve provider fresh for each attempt provider_config = self._resolve_provider(model) client = self._get_or_create_client(provider_config) # Make the API call return client.chat.completions.create( model=provider_config.model, messages=messages, stream=stream, **kwargs, ) except RateLimitError as e: retry_count = self._handle_rate_limit(e, retry_count) time.sleep(self._retry_delay) continue except Exception as e: self._log_error(e) raise raise RuntimeError(f"Failed to complete request after {self._max_retries} retries") def _create_response( self, *, model: str, input: Union[str, List[Dict[str, Any]]], stream: bool = False, **kwargs, ) -> Union[Response, Stream[Response]]: """ Internal method to create a response with retry logic. """ retry_count = 0 while retry_count < self._max_retries: try: # Resolve provider fresh for each attempt provider_config = self._resolve_provider(model) client = self._get_or_create_client(provider_config) # Make the API call return client.responses.create( model=provider_config.model, input=input, stream=stream, **kwargs, ) except RateLimitError as e: retry_count = self._handle_rate_limit(e, retry_count) time.sleep(self._retry_delay) continue except Exception as e: self._log_error(e) raise raise RuntimeError(f"Failed to complete request after {self._max_retries} retries")
[docs] class BorgAsyncOpenAI: """ Drop-in replacement for openai.AsyncOpenAI with BorgLLM integration. Automatically handles: - Provider resolution from model IDs (e.g., "openai:gpt-4o") - Rate limit detection and retry - API key rotation - Virtual provider support - Cooldown management Example:: from borgllm import BorgAsyncOpenAI client = BorgAsyncOpenAI() response = await client.chat.completions.create( model="openai:gpt-4o", messages=[{"role": "user", "content": "Hello!"}] ) """ def __init__( self, config_file: str = "borg.yaml", initial_config_data: Optional[Dict[str, Any]] = None, overrides: Optional[Dict[str, Any]] = None, cooldown: Optional[Union[int, Dict[str, int]]] = None, timeout: Optional[Union[float, Dict[str, float]]] = None, max_retries: int = 10, **kwargs, ): """ Initialize the BorgAsyncOpenAI client. Args: config_file: Path to the BorgLLM configuration file initial_config_data: Optional initial configuration data as dictionary overrides: Optional dictionary of settings to override cooldown: Optional cooldown configuration (int or dict) timeout: Optional timeout configuration (float or dict) max_retries: Maximum number of retries on rate limit errors **kwargs: Additional arguments (reserved for future use) """ self._borgllm_config = BorgLLM.get_instance( config_path=config_file, initial_config_data=initial_config_data, ) if cooldown is not None: self._borgllm_config.set_cooldown_config(cooldown) if timeout is not None: self._borgllm_config.set_timeout_config(timeout) self._overrides = overrides or {} self._max_retries = max_retries self._retry_delay = 0.1 # Cache for AsyncOpenAI clients per provider self._clients: Dict[str, AsyncOpenAI] = {} # Current request state self._current_provider_name: Optional[str] = None self._current_provider_config: Optional[LLMProviderConfig] = None # Lazy-initialized proxies self._chat: Optional[BorgAsyncChat] = None self._responses: Optional[BorgAsyncResponses] = None @property def chat(self) -> BorgAsyncChat: """Access the chat completions API.""" if self._chat is None: self._chat = BorgAsyncChat(self) return self._chat @property def responses(self) -> BorgAsyncResponses: """Access the responses API.""" if self._responses is None: self._responses = BorgAsyncResponses(self) return self._responses def _get_or_create_client(self, provider_config: LLMProviderConfig) -> AsyncOpenAI: """ Get or create an AsyncOpenAI client for the given provider config. Args: provider_config: The resolved provider configuration Returns: Configured AsyncOpenAI client """ # Create a new client with current config (API key may have rotated) client = AsyncOpenAI( api_key=provider_config.api_key, base_url=str(provider_config.base_url), ) return client def _resolve_provider(self, model_id: str) -> LLMProviderConfig: """ Resolve provider configuration from a model ID. Args: model_id: Model identifier (e.g., "openai:gpt-4o") Returns: Resolved LLMProviderConfig """ provider_config = self._borgllm_config.get( model_id, timeout=30, allow_await_cooldown=True, ) # Apply overrides if self._overrides: for key, value in self._overrides.items(): if hasattr(provider_config, key): setattr(provider_config, key, value) self._current_provider_name = provider_config.name self._current_provider_config = provider_config return provider_config def _handle_rate_limit(self, error: Exception, retry_count: int) -> int: """Handle rate limit error and return updated retry count.""" provider_name = self._current_provider_name or "unknown" logger.warning(f"Rate limit error for provider {provider_name}: {error}") self._borgllm_config.signal_429(provider_name) retry_count += 1 if retry_count >= self._max_retries: logger.error(f"Max retries ({self._max_retries}) reached for provider {provider_name}") raise return retry_count def _log_error(self, error: Exception) -> None: """Log detailed error information.""" provider_name = self._current_provider_name or "unknown" logger.error(f"Non-rate-limit error for provider {provider_name}: {error}") logger.error("--------------------------------") logger.error("Config Debug Info:") if self._current_provider_config: logger.error(f" base_url: {self._current_provider_config.base_url}") logger.error(f" model: {self._current_provider_config.model}") logger.error(f" provider_name: {provider_name}") logger.error("--------------------------------") async def _create_chat_completion( self, *, messages: List[Dict[str, Any]], model: str, stream: bool = False, **kwargs, ) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]: """ Internal method to create a chat completion with retry logic. """ retry_count = 0 while retry_count < self._max_retries: try: # Resolve provider fresh for each attempt provider_config = self._resolve_provider(model) client = self._get_or_create_client(provider_config) # Make the API call return await client.chat.completions.create( model=provider_config.model, messages=messages, stream=stream, **kwargs, ) except RateLimitError as e: retry_count = self._handle_rate_limit(e, retry_count) await asyncio.sleep(self._retry_delay) continue except Exception as e: self._log_error(e) raise raise RuntimeError(f"Failed to complete request after {self._max_retries} retries") async def _create_response( self, *, model: str, input: Union[str, List[Dict[str, Any]]], stream: bool = False, **kwargs, ) -> Union[Response, AsyncStream[Response]]: """ Internal method to create a response with retry logic. """ retry_count = 0 while retry_count < self._max_retries: try: # Resolve provider fresh for each attempt provider_config = self._resolve_provider(model) client = self._get_or_create_client(provider_config) # Make the API call return await client.responses.create( model=provider_config.model, input=input, stream=stream, **kwargs, ) except RateLimitError as e: retry_count = self._handle_rate_limit(e, retry_count) await asyncio.sleep(self._retry_delay) continue except Exception as e: self._log_error(e) raise raise RuntimeError(f"Failed to complete request after {self._max_retries} retries")