#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Google Gemini Live API service implementation.

This module provides real-time conversational AI capabilities using Google's
Gemini Live API, supporting both text and audio modalities with
voice transcription, streaming responses, and tool usage.
"""

import asyncio
import base64
import io
import time
import uuid
import warnings
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from loguru import logger
from PIL import Image
from pydantic import BaseModel, Field

from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter
from pipecat.frames.frames import (
    AggregationType,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
    CancelFrame,
    EndFrame,
    Frame,
    InputAudioRawFrame,
    InputImageRawFrame,
    InputTextRawFrame,
    InterruptionFrame,
    LLMContextFrame,
    LLMFullResponseEndFrame,
    LLMFullResponseStartFrame,
    LLMMessagesAppendFrame,
    LLMSetToolsFrame,
    LLMTextFrame,
    LLMThoughtEndFrame,
    LLMThoughtStartFrame,
    LLMThoughtTextFrame,
    StartFrame,
    TranscriptionFrame,
    TTSAudioRawFrame,
    TTSStartedFrame,
    TTSStoppedFrame,
    TTSTextFrame,
    UserImageRawFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
    LLMAssistantAggregatorParams,
    LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import (
    OpenAILLMContext,
    OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult
from pipecat.services.google.utils import update_google_client_http_options
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.openai.llm import (
    OpenAIAssistantContextAggregator,
    OpenAIUserContextAggregator,
)
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven, _warn_deprecated_param
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt

from .file_api import GeminiFileAPI

try:
    from google.genai import Client
    from google.genai.live import AsyncSession
    from google.genai.types import (
        AudioTranscriptionConfig,
        AutomaticActivityDetection,
        Blob,
        Content,
        ContextWindowCompressionConfig,
        EndSensitivity,
        FileData,
        FunctionResponse,
        GenerationConfig,
        GroundingMetadata,
        HttpOptions,
        LiveConnectConfig,
        LiveServerMessage,
        MediaResolution,
        Modality,
        Part,
        ProactivityConfig,
        RealtimeInputConfig,
        SessionResumptionConfig,
        SlidingWindow,
        SpeechConfig,
        StartSensitivity,
        ThinkingConfig,
        VoiceConfig,
    )
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
    raise Exception(f"Missing module: {e}")


# Connection management constants
MAX_CONSECUTIVE_FAILURES = 3
CONNECTION_ESTABLISHED_THRESHOLD = 10.0  # seconds


def language_to_gemini_language(language: Language) -> Optional[str]:
    """Maps a Language enum value to a Gemini Live supported language code.

    Source:
    https://ai.google.dev/api/generate-content#MediaResolution

    Args:
        language: The language enum value to convert.

    Returns:
        The Gemini language code string, or None if the language is not supported.
    """
    LANGUAGE_MAP = {
        # Arabic
        Language.AR: "ar-XA",
        # Bengali
        Language.BN_IN: "bn-IN",
        # Chinese (Mandarin)
        Language.CMN: "cmn-CN",
        Language.CMN_CN: "cmn-CN",
        Language.ZH: "cmn-CN",  # Map general Chinese to Mandarin for Gemini
        Language.ZH_CN: "cmn-CN",  # Map Simplified Chinese to Mandarin for Gemini
        # German
        Language.DE: "de-DE",
        Language.DE_DE: "de-DE",
        # English
        Language.EN: "en-US",  # Default to US English (though not explicitly listed in supported codes)
        Language.EN_US: "en-US",
        Language.EN_AU: "en-AU",
        Language.EN_GB: "en-GB",
        Language.EN_IN: "en-IN",
        # Spanish
        Language.ES: "es-ES",  # Default to Spain Spanish
        Language.ES_ES: "es-ES",
        Language.ES_US: "es-US",
        # French
        Language.FR: "fr-FR",  # Default to France French
        Language.FR_FR: "fr-FR",
        Language.FR_CA: "fr-CA",
        # Gujarati
        Language.GU: "gu-IN",
        Language.GU_IN: "gu-IN",
        # Hindi
        Language.HI: "hi-IN",
        Language.HI_IN: "hi-IN",
        # Indonesian
        Language.ID: "id-ID",
        Language.ID_ID: "id-ID",
        # Italian
        Language.IT: "it-IT",
        Language.IT_IT: "it-IT",
        # Japanese
        Language.JA: "ja-JP",
        Language.JA_JP: "ja-JP",
        # Kannada
        Language.KN: "kn-IN",
        Language.KN_IN: "kn-IN",
        # Korean
        Language.KO: "ko-KR",
        Language.KO_KR: "ko-KR",
        # Malayalam
        Language.ML: "ml-IN",
        Language.ML_IN: "ml-IN",
        # Marathi
        Language.MR: "mr-IN",
        Language.MR_IN: "mr-IN",
        # Dutch
        Language.NL: "nl-NL",
        Language.NL_NL: "nl-NL",
        # Polish
        Language.PL: "pl-PL",
        Language.PL_PL: "pl-PL",
        # Portuguese (Brazil)
        Language.PT_BR: "pt-BR",
        # Russian
        Language.RU: "ru-RU",
        Language.RU_RU: "ru-RU",
        # Tamil
        Language.TA: "ta-IN",
        Language.TA_IN: "ta-IN",
        # Telugu
        Language.TE: "te-IN",
        Language.TE_IN: "te-IN",
        # Thai
        Language.TH: "th-TH",
        Language.TH_TH: "th-TH",
        # Turkish
        Language.TR: "tr-TR",
        Language.TR_TR: "tr-TR",
        # Vietnamese
        Language.VI: "vi-VN",
        Language.VI_VN: "vi-VN",
    }

    return resolve_language(language, LANGUAGE_MAP, use_base_code=False)


class GeminiLiveContext(OpenAILLMContext):
    """Extended OpenAI context for Gemini Live API.

    Provides Gemini-specific context management including system instruction
    extraction and message format conversion for the Live API.

    .. deprecated:: 0.0.92
        Gemini Live no longer uses `GeminiLiveContext` under the hood.
        It now uses `LLMContext`.
    """

    @staticmethod
    def upgrade(obj: OpenAILLMContext) -> "GeminiLiveContext":
        """Upgrade an OpenAI context to Gemini context.

        Args:
            obj: The OpenAI context to upgrade.

        Returns:
            The upgraded Gemini context instance.
        """
        # This warning is here rather than `__init__` since `upgrade()` was the
        # "main" way that GeminiLiveContext instances were created.
        # Almost no users should be seeing this message anyway, as
        # GeminiLiveContext instances were typically created under the hood:
        # the user would pass an OpenAILLMContext instance, which would be
        # upgraded without them necessarily knowing.
        with warnings.catch_warnings():
            warnings.simplefilter("always")
            warnings.warn(
                "GeminiLiveContext is deprecated. "
                "Gemini Live no longer uses GeminiLiveContext under the hood. "
                "It now uses LLMContext.",
                DeprecationWarning,
                stacklevel=2,
            )

        if isinstance(obj, OpenAILLMContext) and not isinstance(obj, GeminiLiveContext):
            logger.debug(f"Upgrading to Gemini Live Context: {obj}")
            obj.__class__ = GeminiLiveContext
            obj._restructure_from_openai_messages()
        return obj

    def _restructure_from_openai_messages(self):
        pass

    def extract_system_instructions(self):
        """Extract system instructions from context messages.

        Returns:
            Combined system instruction text from all system messages.
        """
        system_instruction = ""
        for item in self.messages:
            if item.get("role") == "system":
                content = item.get("content", "")
                if content:
                    if system_instruction and not system_instruction.endswith("\n"):
                        system_instruction += "\n"
                    system_instruction += str(content)
        return system_instruction

    def add_file_reference(self, file_uri: str, mime_type: str, text: Optional[str] = None):
        """Add a file reference to the context.

        This adds a user message with a file reference that will be sent during context initialization.

        Args:
            file_uri: URI of the uploaded file
            mime_type: MIME type of the file
            text: Optional text prompt to accompany the file
        """
        # Create parts list with file reference
        parts = []
        if text:
            parts.append({"type": "text", "text": text})

        # Add file reference part
        parts.append(
            {"type": "file_data", "file_data": {"mime_type": mime_type, "file_uri": file_uri}}
        )

        # Add to messages
        message = {"role": "user", "content": parts}
        self.messages.append(message)
        logger.info(f"Added file reference to context: {file_uri}")

    def get_messages_for_initializing_history(self) -> List[Content]:
        """Get messages formatted for Gemini history initialization.

        Returns:
            List of messages in Gemini format for conversation history.
        """
        messages: List[Content] = []
        for item in self.messages:
            role = item.get("role")

            if role == "system":
                continue

            elif role == "assistant":
                role = "model"

            content = item.get("content")
            parts: List[Part] = []
            if isinstance(content, str):
                parts = [Part(text=content)]
            elif isinstance(content, list):
                for part in content:
                    if part.get("type") == "text":
                        parts.append(Part(text=part.get("text")))
                    elif part.get("type") == "file_data":
                        file_data = part.get("file_data", {})
                        parts.append(
                            Part(
                                file_data=FileData(
                                    mime_type=file_data.get("mime_type"),
                                    file_uri=file_data.get("file_uri"),
                                )
                            )
                        )
                    else:
                        logger.warning(f"Unsupported content type: {str(part)[:80]}")
            else:
                logger.warning(f"Unsupported content type: {str(content)[:80]}")
            messages.append(Content(role=role, parts=parts))
        return messages


class GeminiLiveUserContextAggregator(OpenAIUserContextAggregator):
    """User context aggregator for Gemini Live.

    Extends OpenAI user aggregator to handle Gemini-specific message passing
    while maintaining compatibility with the standard aggregation pipeline.

    .. deprecated:: 0.0.92
        Gemini Live no longer expects a `GeminiLiveUserContextAggregator`.
        It now expects a `LLMUserAggregator`.
    """

    def __init__(self, *args, **kwargs):
        """Initialize Gemini Live user context aggregator."""
        # Almost no users should be seeing this message, as
        # `GeminiLiveUserContextAggregator`` instances were typically created
        # under the hood, as part of `llm.create_context_aggregator()`.
        with warnings.catch_warnings():
            warnings.simplefilter("always")
            warnings.warn(
                "GeminiLiveUserContextAggregator is deprecated. "
                "Gemini Live no longer expects a GeminiLiveUserContextAggregator. "
                "It now expects a LLMUserAggregator.",
                DeprecationWarning,
                stacklevel=2,
            )
        super().__init__(*args, **kwargs)

    async def process_frame(self, frame, direction):
        """Process incoming frames for user context aggregation.

        Args:
            frame: The frame to process.
            direction: The frame processing direction.
        """
        await super().process_frame(frame, direction)
        # kind of a hack just to pass the LLMMessagesAppendFrame through, but it's fine for now
        if isinstance(frame, LLMMessagesAppendFrame):
            await self.push_frame(frame, direction)


class GeminiLiveAssistantContextAggregator(OpenAIAssistantContextAggregator):
    """Assistant context aggregator for Gemini Live.

    Handles assistant response aggregation while filtering out LLMTextFrames
    to prevent duplicate context entries, as Gemini Live pushes both
    LLMTextFrames and TTSTextFrames.

    .. deprecated:: 0.0.92
        Gemini Live no longer uses `GeminiLiveAssistantContextAggregator` under the hood.
        It now uses `LLMAssistantAggregator`.
    """

    def __init__(self, *args, **kwargs):
        """Initialize Gemini Live assistant context aggregator."""
        # Almost no users should be seeing this message, as
        # `GeminiLiveAssistantContextAggregator` instances were typically
        # created under the hood, as part of `llm.create_context_aggregator()`.
        with warnings.catch_warnings():
            warnings.simplefilter("always")
            warnings.warn(
                "GeminiLiveAssistantContextAggregator is deprecated. "
                "Gemini Live no longer uses GeminiLiveAssistantContextAggregator under the hood. "
                "It now uses LLMAssistantAggregator.",
                DeprecationWarning,
                stacklevel=2,
            )
        super().__init__(*args, **kwargs)

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames for assistant context aggregation.

        Args:
            frame: The frame to process.
            direction: The frame processing direction.
        """
        # The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output,
        # but the GeminiLiveAssistantContextAggregator pushes LLMTextFrames and TTSTextFrames. We
        # need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
        # are process. This ensures that the context gets only one set of messages.
        if not isinstance(frame, LLMTextFrame):
            await super().process_frame(frame, direction)

    async def handle_user_image_frame(self, frame: UserImageRawFrame):
        """Handle user image frames.

        Args:
            frame: The user image frame to handle.
        """
        # We don't want to store any images in the context. Revisit this later
        # when the API evolves.
        pass


@dataclass
class GeminiLiveContextAggregatorPair:
    """Pair of user and assistant context aggregators for Gemini Live.

    .. deprecated:: 0.0.92
        `GeminiLiveContextAggregatorPair` is deprecated.
        Use `LLMContextAggregatorPair` instead.

    Parameters:
        _user: The user context aggregator instance.
        _assistant: The assistant context aggregator instance.
    """

    _user: GeminiLiveUserContextAggregator
    _assistant: GeminiLiveAssistantContextAggregator

    def __post_init__(self):
        # Almost no users should be seeing this message, as
        # `GeminiLiveContextAggregatorPair` instances were typically created
        # under the hood, with `llm.create_context_aggregator()`.
        with warnings.catch_warnings():
            warnings.simplefilter("always")
            warnings.warn(
                "GeminiLiveContextAggregatorPair is deprecated. "
                "Use LLMContextAggregatorPair instead.",
                DeprecationWarning,
                stacklevel=2,
            )

    def user(self) -> GeminiLiveUserContextAggregator:
        """Get the user context aggregator.

        Returns:
            The user context aggregator instance.
        """
        return self._user

    def assistant(self) -> GeminiLiveAssistantContextAggregator:
        """Get the assistant context aggregator.

        Returns:
            The assistant context aggregator instance.
        """
        return self._assistant


class GeminiModalities(Enum):
    """Supported modalities for Gemini Live.

    Parameters:
        TEXT: Text responses.
        AUDIO: Audio responses.
    """

    TEXT = "TEXT"
    AUDIO = "AUDIO"


class GeminiMediaResolution(str, Enum):
    """Media resolution options for Gemini Live.

    Parameters:
        UNSPECIFIED: Use default resolution setting.
        LOW: Low resolution with 64 tokens.
        MEDIUM: Medium resolution with 256 tokens.
        HIGH: High resolution with zoomed reframing and 256 tokens.
    """

    UNSPECIFIED = "MEDIA_RESOLUTION_UNSPECIFIED"  # Use default
    LOW = "MEDIA_RESOLUTION_LOW"  # 64 tokens
    MEDIUM = "MEDIA_RESOLUTION_MEDIUM"  # 256 tokens
    HIGH = "MEDIA_RESOLUTION_HIGH"  # Zoomed reframing with 256 tokens


class GeminiVADParams(BaseModel):
    """Voice Activity Detection parameters for Gemini Live.

    Parameters:
        disabled: Whether to disable VAD. Defaults to None.
        start_sensitivity: Sensitivity for speech start detection. Defaults to None.
        end_sensitivity: Sensitivity for speech end detection. Defaults to None.
        prefix_padding_ms: Prefix padding in milliseconds. Defaults to None.
        silence_duration_ms: Silence duration threshold in milliseconds. Defaults to None.
    """

    disabled: Optional[bool] = Field(default=None)
    start_sensitivity: Optional[StartSensitivity] = Field(default=None)
    end_sensitivity: Optional[EndSensitivity] = Field(default=None)
    prefix_padding_ms: Optional[int] = Field(default=None)
    silence_duration_ms: Optional[int] = Field(default=None)


class ContextWindowCompressionParams(BaseModel):
    """Parameters for context window compression in Gemini Live.

    Parameters:
        enabled: Whether compression is enabled. Defaults to False.
        trigger_tokens: Token count to trigger compression. None uses 80% of context window.
    """

    enabled: bool = Field(default=False)
    trigger_tokens: Optional[int] = Field(
        default=None
    )  # None = use default (80% of context window)


class InputParams(BaseModel):
    """Input parameters for Gemini Live generation.

    .. deprecated:: 0.0.105
        Use ``GeminiLiveLLMSettings`` instead.

    Parameters:
        frequency_penalty: Frequency penalty for generation (0.0-2.0). Defaults to None.
        max_tokens: Maximum tokens to generate. Must be >= 1. Defaults to 4096.
        presence_penalty: Presence penalty for generation (0.0-2.0). Defaults to None.
        temperature: Sampling temperature (0.0-2.0). Defaults to None.
        top_k: Top-k sampling parameter. Must be >= 0. Defaults to None.
        top_p: Top-p sampling parameter (0.0-1.0). Defaults to None.
        modalities: Response modalities. Defaults to AUDIO.
        language: Language for generation. Defaults to EN_US.
        media_resolution: Media resolution setting. Defaults to UNSPECIFIED.
        vad: Voice activity detection parameters. Defaults to None.
        context_window_compression: Context compression settings. Defaults to None.
        thinking: Thinking settings. Defaults to None.
            Note that these settings may require specifying a model that
            supports them, e.g. "gemini-2.5-flash-native-audio-preview-12-2025".
        enable_affective_dialog: Enable affective dialog, which allows Gemini
            to adapt to expression and tone. Defaults to None.
            Note that these settings may require specifying a model that
            supports them, e.g. "gemini-2.5-flash-native-audio-preview-12-2025".
            Also note that this setting may require specifying an API version that
            supports it, e.g. HttpOptions(api_version="v1alpha").
        proactivity: Proactivity settings, which allows Gemini to proactively
            decide how to behave, such as whether to avoid responding to
            content that is not relevant. Defaults to None.
            Note that these settings may require specifying a model that
            supports them, e.g. "gemini-2.5-flash-native-audio-preview-12-2025".
            Also note that this setting may require specifying an API version that
            supports it, e.g. HttpOptions(api_version="v1alpha").
        extra: Additional parameters. Defaults to empty dict.
    """

    frequency_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0)
    max_tokens: Optional[int] = Field(default=4096, ge=1)
    presence_penalty: Optional[float] = Field(default=None, ge=0.0, le=2.0)
    temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0)
    top_k: Optional[int] = Field(default=None, ge=0)
    top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
    modalities: Optional[GeminiModalities] = Field(default=GeminiModalities.AUDIO)
    language: Optional[Language] = Field(default=Language.EN_US)
    media_resolution: Optional[GeminiMediaResolution] = Field(
        default=GeminiMediaResolution.UNSPECIFIED
    )
    vad: Optional[GeminiVADParams] = Field(default=None)
    context_window_compression: Optional[ContextWindowCompressionParams] = Field(default=None)
    thinking: Optional[ThinkingConfig] = Field(default=None)
    enable_affective_dialog: Optional[bool] = Field(default=None)
    proactivity: Optional[ProactivityConfig] = Field(default=None)
    extra: Optional[Dict[str, Any]] = Field(default_factory=dict)


@dataclass
class GeminiLiveLLMSettings(LLMSettings):
    """Settings for GeminiLiveLLMService.

    Parameters:
        voice: TTS voice identifier (e.g. ``"Charon"``).
        modalities: Response modalities.
        language: Language for generation.
        media_resolution: Media resolution setting.
        vad: Voice activity detection parameters.
        context_window_compression: Context window compression configuration.
        thinking: Thinking configuration.
        enable_affective_dialog: Whether to enable affective dialog.
        proactivity: Proactivity configuration.
    """

    voice: str | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    modalities: GeminiModalities | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    language: Language | str | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    media_resolution: GeminiMediaResolution | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    vad: GeminiVADParams | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    context_window_compression: ContextWindowCompressionParams | dict | _NotGiven = field(
        default_factory=lambda: NOT_GIVEN
    )
    thinking: ThinkingConfig | dict | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    enable_affective_dialog: bool | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    proactivity: ProactivityConfig | dict | _NotGiven = field(default_factory=lambda: NOT_GIVEN)


class GeminiLiveLLMService(LLMService):
    """Provides access to Google's Gemini Live API.

    This service enables real-time conversations with Gemini, supporting both
    text and audio modalities. It handles voice transcription, streaming audio
    responses, and tool usage.
    """

    Settings = GeminiLiveLLMSettings
    _settings: GeminiLiveLLMSettings

    # Overriding the default adapter to use the Gemini one.
    adapter_class = GeminiLLMAdapter

    def __init__(
        self,
        *,
        api_key: str,
        base_url: Optional[str] = None,
        model: Optional[str] = None,
        voice_id: str = "Charon",
        start_audio_paused: bool = False,
        start_video_paused: bool = False,
        system_instruction: Optional[str] = None,
        tools: Optional[Union[List[dict], ToolsSchema]] = None,
        params: Optional[InputParams] = None,
        settings: Optional[GeminiLiveLLMSettings] = None,
        inference_on_context_initialization: bool = True,
        file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files",
        http_options: Optional[HttpOptions] = None,
        **kwargs,
    ):
        """Initialize the Gemini Live LLM service.

        Args:
            api_key: Google AI API key for authentication.
            base_url: API endpoint base URL. Defaults to the official Gemini Live endpoint.

                .. deprecated:: 0.0.90
                    This parameter is deprecated and no longer has any effect.
                    Please use `http_options` to customize requests made by the
                    API client.

            model: Model identifier to use.

                .. deprecated:: 0.0.105
                    Use ``settings=GeminiLiveLLMSettings(model=...)`` instead.

            voice_id: TTS voice identifier. Defaults to "Charon".

                .. deprecated:: 0.0.105
                    Use ``settings=GeminiLiveLLMSettings(voice=...)`` instead.
            start_audio_paused: Whether to start with audio input paused. Defaults to False.
            start_video_paused: Whether to start with video input paused. Defaults to False.
            system_instruction: System prompt for the model. Defaults to None.
            tools: Tools/functions available to the model. Defaults to None.
            params: Configuration parameters for the model.

                .. deprecated:: 0.0.105
                    Use ``settings=GeminiLiveLLMSettings(...)`` instead.

            settings: Gemini Live LLM settings. If provided together with deprecated
                top-level parameters, the ``settings`` values take precedence.
            inference_on_context_initialization: Whether to generate a response when context
                is first set. Defaults to True.
            file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint.
            http_options: HTTP options for the client.
            **kwargs: Additional arguments passed to parent LLMService.
        """
        # Check for deprecated parameter usage
        if base_url is not None:
            import warnings

            with warnings.catch_warnings():
                warnings.simplefilter("always")
                warnings.warn(
                    "Parameter 'base_url' is deprecated and no longer has any effect. Please use 'http_options' to customize requests made by the API client.",
                    DeprecationWarning,
                    stacklevel=2,
                )

        # 1. Initialize default_settings with hardcoded defaults
        default_settings = GeminiLiveLLMSettings(
            model="models/gemini-2.5-flash-native-audio-preview-12-2025",
            system_instruction=system_instruction,
            voice="Charon",
            frequency_penalty=None,
            max_tokens=4096,
            presence_penalty=None,
            temperature=None,
            top_k=None,
            top_p=None,
            seed=None,
            filter_incomplete_user_turns=False,
            user_turn_completion_config=None,
            modalities=GeminiModalities.AUDIO,
            language="en-US",
            media_resolution=GeminiMediaResolution.UNSPECIFIED,
            vad=None,
            context_window_compression={},
            thinking={},
            enable_affective_dialog=False,
            proactivity={},
            extra={},
        )

        # 2. Apply direct init arg overrides (deprecated)
        if model is not None:
            _warn_deprecated_param("model", GeminiLiveLLMSettings, "model")
            default_settings.model = model
        if voice_id != "Charon":
            _warn_deprecated_param("voice_id", GeminiLiveLLMSettings, "voice")
            default_settings.voice = voice_id

        # 3. Apply params overrides — only if settings not provided
        if params is not None:
            _warn_deprecated_param("params", GeminiLiveLLMSettings)
            if not settings:
                default_settings.frequency_penalty = params.frequency_penalty
                default_settings.max_tokens = params.max_tokens
                default_settings.presence_penalty = params.presence_penalty
                default_settings.temperature = params.temperature
                default_settings.top_k = params.top_k
                default_settings.top_p = params.top_p
                default_settings.modalities = params.modalities
                default_settings.language = (
                    language_to_gemini_language(params.language) if params.language else "en-US"
                )
                default_settings.media_resolution = params.media_resolution
                default_settings.vad = params.vad
                default_settings.context_window_compression = (
                    params.context_window_compression.model_dump()
                    if params.context_window_compression
                    else {}
                )
                default_settings.thinking = params.thinking or {}
                default_settings.enable_affective_dialog = params.enable_affective_dialog or False
                default_settings.proactivity = params.proactivity or {}
                if isinstance(params.extra, dict):
                    default_settings.extra = params.extra

        # 4. Apply settings delta (canonical API, always wins)
        if settings is not None:
            default_settings.apply_update(settings)

        super().__init__(
            base_url=base_url,
            settings=default_settings,
            **kwargs,
        )

        self._last_sent_time = 0
        self._base_url = base_url

        self._system_instruction_from_init = system_instruction
        self._tools_from_init = tools
        self._inference_on_context_initialization = inference_on_context_initialization
        self._needs_turn_complete_message = False

        self._audio_input_paused = start_audio_paused
        self._video_input_paused = start_video_paused
        self._context = None
        self._api_key = api_key
        self._http_options = update_google_client_http_options(http_options)
        self._session: AsyncSession = None
        self._connection_task = None

        self._disconnecting = False
        self._run_llm_when_session_ready = False

        self._user_is_speaking = False
        self._bot_is_responding = False
        self._user_audio_buffer = bytearray()
        self._user_transcription_buffer = ""
        self._last_transcription_sent = ""
        self._bot_audio_buffer = bytearray()
        self._bot_text_buffer = ""
        self._llm_output_buffer = ""
        self._transcription_timeout_task = None

        self._sample_rate = 24000

        self._language = self._settings.language
        self._language_code = (
            language_to_gemini_language(self._settings.language)
            if self._settings.language
            else "en-US"
        )
        self._vad_params = self._settings.vad

        # Reconnection tracking
        self._consecutive_failures = 0
        self._connection_start_time = None

        self._file_api_base_url = file_api_base_url
        self._file_api: Optional[GeminiFileAPI] = None

        # Grounding metadata tracking
        self._search_result_buffer = ""
        self._accumulated_grounding_metadata = None

        # Session resumption
        self._session_resumption_handle: Optional[str] = None

        # Bookkeeping for ending gracefully (i.e. after the bot is finished)
        self._end_frame_pending_bot_turn_finished: Optional[EndFrame] = None

        # Initialize the API client. Subclasses can override this if needed.
        self.create_client()

        # Bookkeeping for tool calls
        self._completed_tool_calls = set()

    def create_client(self):
        """Create the Gemini API client instance. Subclasses can override this."""
        self._client = Client(api_key=self._api_key, http_options=self._http_options)

    @property
    def file_api(self) -> GeminiFileAPI:
        """Get the Gemini File API client instance. Subclasses can override this.

        Returns:
            The Gemini File API client.
        """
        if not self._file_api:
            self._file_api = GeminiFileAPI(api_key=self._api_key, base_url=self._file_api_base_url)
        return self._file_api

    def can_generate_metrics(self) -> bool:
        """Check if the service can generate usage metrics.

        Returns:
            True as Gemini Live supports token usage metrics.
        """
        return True

    async def _update_settings(self, delta: LLMSettings) -> dict[str, Any]:
        """Apply a settings delta.

        Settings are stored but not applied to the active connection.
        """
        changed = await super()._update_settings(delta)

        if not changed:
            return changed

        # TODO: someday we could reconnect here to apply updated settings.
        # Code might look something like the below:
        # await self._disconnect()
        # await self._connect()

        self._warn_unhandled_updated_settings(changed)

        return changed

    def set_audio_input_paused(self, paused: bool):
        """Set the audio input pause state.

        Args:
            paused: Whether to pause audio input.
        """
        self._audio_input_paused = paused

    def set_video_input_paused(self, paused: bool):
        """Set the video input pause state.

        Args:
            paused: Whether to pause video input.
        """
        self._video_input_paused = paused

    def set_model_modalities(self, modalities: GeminiModalities):
        """Set the model response modalities.

        Args:
            modalities: The modalities to use for responses.
        """
        self._settings.modalities = modalities

    def set_language(self, language: Language):
        """Set the language for generation.

        Args:
            language: The language to use for generation.
        """
        self._language = language
        self._language_code = language_to_gemini_language(language) or "en-US"
        self._settings.language = self._language_code
        logger.info(f"Set Gemini language to: {self._language_code}")

    async def set_context(self, context: OpenAILLMContext):
        """Set the context explicitly from outside the pipeline.

        This is useful when initializing a conversation because in server-side VAD mode we might not have a
        way to trigger the pipeline. This sends the history to the server. The `inference_on_context_initialization`
        flag controls whether to set the turnComplete flag when we do this. Without that flag, the model will
        not respond. This is often what we want when setting the context at the beginning of a conversation.

        Args:
            context: The OpenAI LLM context to set.
        """
        if self._context:
            logger.error("Context already set. Can only set up Gemini Live context once.")
            return
        self._context = GeminiLiveContext.upgrade(context)
        await self._create_initial_response()

    #
    # standard AIService frame handling
    #

    async def start(self, frame: StartFrame):
        """Start the service and establish connection.

        Args:
            frame: The start frame.
        """
        await super().start(frame)
        await self._connect()

    async def stop(self, frame: EndFrame):
        """Stop the service and close connections.

        Args:
            frame: The end frame.
        """
        await super().stop(frame)
        await self._disconnect()

    async def cancel(self, frame: CancelFrame):
        """Cancel the service and close connections.

        Args:
            frame: The cancel frame.
        """
        await super().cancel(frame)
        await self._disconnect()

    #
    # speech and interruption handling
    #

    async def _handle_interruption(self):
        if self._bot_is_responding:
            await self._set_bot_is_responding(False)
            if self._settings.modalities == GeminiModalities.AUDIO:
                await self.push_frame(TTSStoppedFrame())
            # Do not send LLMFullResponseEndFrame here - an interruption
            # already tells the assistant context aggregator that the response
            # is over.

    async def _handle_user_started_speaking(self, frame):
        self._user_is_speaking = True
        pass

    async def _handle_user_stopped_speaking(self, frame):
        self._user_is_speaking = False
        self._user_audio_buffer = bytearray()
        await self.start_ttfb_metrics()
        if self._needs_turn_complete_message:
            self._needs_turn_complete_message = False
            # NOTE: without this, the model ignores the context it's been
            # seeded with before the user started speaking
            await self._session.send_client_content(turn_complete=True)

    #
    # frame processing
    # StartFrame, StopFrame, CancelFrame implemented in base class
    #

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames for the Gemini Live service.

        Args:
            frame: The frame to process.
            direction: The frame processing direction.
        """
        # Defer EndFrame handling until after the bot turn is finished
        if isinstance(frame, EndFrame):
            if self._bot_is_responding:
                logger.debug("Deferring handling EndFrame until bot turn is finished")
                self._end_frame_pending_bot_turn_finished = frame
                return

        await super().process_frame(frame, direction)

        if isinstance(frame, TranscriptionFrame):
            await self.push_frame(frame, direction)
        elif isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
            context = (
                frame.context
                if isinstance(frame, LLMContextFrame)
                else LLMContext.from_openai_context(frame.context)
            )
            await self._handle_context(context)
        elif isinstance(frame, InputTextRawFrame):
            await self._send_user_text(frame.text)
            await self.push_frame(frame, direction)
        elif isinstance(frame, InputAudioRawFrame):
            await self._send_user_audio(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, InputImageRawFrame):
            await self._send_user_video(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, InterruptionFrame):
            await self._handle_interruption()
            await self.push_frame(frame, direction)
        elif isinstance(frame, UserStartedSpeakingFrame):
            await self._handle_user_started_speaking(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, UserStoppedSpeakingFrame):
            await self._handle_user_stopped_speaking(frame)
            await self.push_frame(frame, direction)
        elif isinstance(frame, BotStartedSpeakingFrame):
            # Ignore this frame. Use the serverContent API message instead
            await self.push_frame(frame, direction)
        elif isinstance(frame, BotStoppedSpeakingFrame):
            # ignore this frame. Use the serverContent.turnComplete API message
            await self.push_frame(frame, direction)
        elif isinstance(frame, LLMMessagesAppendFrame):
            # NOTE: handling LLMMessagesAppendFrame here in the LLMService is
            # unusual - typically this would be handled in the user context
            # aggregator. Leaving this handling here so that user code that
            # uses this frame *without* a user context aggregator still works
            # (we have an example that does just that, actually).
            await self._create_single_response(frame.messages)
        elif isinstance(frame, LLMSetToolsFrame):
            # TODO: implement runtime tool updates for Gemini Live.
            pass
        else:
            await self.push_frame(frame, direction)

    async def _handle_context(self, context: LLMContext):
        if not self._context:
            # We got our initial context
            self._context = context

            # If context contains system instruction or tools, reconnect in
            # order to apply them.
            # (Context-provided system instruction and tools take precedence
            # over the ones provided at initialization time. Note that we could
            # do more sophisticated comparisons here, but for now this is
            # sufficient: we'll assume folks won't mean to provide these
            # settings both in the context and at initialization time. In a
            # future change, we could/should implement the ability to swap
            # these settings at any point).
            adapter: GeminiLLMAdapter = self.get_llm_adapter()
            params = adapter.get_llm_invocation_params(self._context)
            system_instruction = params["system_instruction"]
            tools = params["tools"]
            if system_instruction and self._system_instruction_from_init:
                logger.warning(
                    "System instruction provided both at init time and in context; using context-provided value."
                )
            if tools and self._tools_from_init:
                logger.warning(
                    "Tools provided both at init time and in context; using context-provided value."
                )
            if system_instruction or tools:
                await self._reconnect()

            # Initialize our bookkeeping of already-completed tool calls in
            # the context
            await self._process_completed_function_calls(send_new_results=False)

            # Create initial response if needed, based on conversation history
            # in context.
            # (If the context has no messages but we do have a system
            # instruction — meaning it was provided at init time — doctor our
            # context now so that we'll have something to send to the service
            # to trigger a response).
            messages = params["messages"]
            if not messages and self._inference_on_context_initialization:
                if self._system_instruction_from_init:
                    logger.debug(
                        "No messages found in initial context; seeding with system instruction to trigger bot response."
                    )
                    self._context.add_message(
                        {"role": "system", "content": self._system_instruction_from_init}
                    )
                else:
                    logger.warning(
                        "No messages found in initial context; cannot trigger initial bot response without messages or system instruction."
                    )
            await self._create_initial_response()
        else:
            # We got an updated context.
            self._context = context

            # Here we assume that the updated context will contain either:
            # - new messages (that the Gemini Live service, with its own
            #   context management, is already aware of), or
            # - tool call results (that we need to tell the remote service
            #   about).
            # (In the future, we could do more sophisticated diffing here,
            # which would enable the user to programmatically manipulate the
            # context).

            # Send results for newly-completed function calls, if any.
            await self._process_completed_function_calls(send_new_results=True)

    async def _process_completed_function_calls(self, send_new_results: bool):
        # Check for set of completed function calls in the context
        adapter: GeminiLLMAdapter = self.get_llm_adapter()
        messages = adapter.get_llm_invocation_params(self._context).get("messages", [])
        for message in messages:
            if message.parts:
                for part in message.parts:
                    if part.function_response:
                        tool_call_id = part.function_response.id
                        tool_name = part.function_response.name
                        response = part.function_response.response
                        if (
                            tool_call_id
                            and tool_call_id not in self._completed_tool_calls
                            and response
                            and response.get("value") != "IN_PROGRESS"
                        ):
                            # Found a newly-completed function call - send the result to the service
                            if send_new_results:
                                await self._tool_result(
                                    tool_call_id, tool_name, part.function_response.response
                                )
                            self._completed_tool_calls.add(tool_call_id)

    async def _set_bot_is_responding(self, responding: bool):
        if self._bot_is_responding == responding:
            return

        self._bot_is_responding = responding

        if not self._bot_is_responding and self._end_frame_pending_bot_turn_finished:
            await self.queue_frame(self._end_frame_pending_bot_turn_finished)
            self._end_frame_pending_bot_turn_finished = None

    async def _connect(self, session_resumption_handle: Optional[str] = None):
        """Establish client connection to Gemini Live API."""
        if self._session:
            # Here we assume that if we have a client, we are connected. We
            # handle disconnections in the send/recv code paths.
            return

        if session_resumption_handle:
            logger.info(
                f"Connecting to Gemini service with session_resumption_handle: {session_resumption_handle}"
            )
        else:
            logger.info("Connecting to Gemini service")
        try:
            # Assemble basic configuration
            config = LiveConnectConfig(
                generation_config=GenerationConfig(
                    frequency_penalty=self._settings.frequency_penalty,
                    max_output_tokens=self._settings.max_tokens,
                    presence_penalty=self._settings.presence_penalty,
                    temperature=self._settings.temperature,
                    top_k=self._settings.top_k,
                    top_p=self._settings.top_p,
                    response_modalities=[Modality(self._settings.modalities.value)],
                    speech_config=SpeechConfig(
                        voice_config=VoiceConfig(
                            prebuilt_voice_config={"voice_name": self._settings.voice}
                        ),
                        language_code=self._settings.language,
                    ),
                    media_resolution=MediaResolution(self._settings.media_resolution.value),
                ),
                input_audio_transcription=AudioTranscriptionConfig(),
                output_audio_transcription=AudioTranscriptionConfig(),
                session_resumption=SessionResumptionConfig(handle=session_resumption_handle),
            )

            # Add context window compression to configuration, if enabled
            cwc = self._settings.context_window_compression or {}
            if cwc.get("enabled", False):
                compression_config = ContextWindowCompressionConfig()

                # Add sliding window (always true if compression is enabled)
                compression_config.sliding_window = SlidingWindow()

                # Add trigger_tokens if specified
                trigger_tokens = cwc.get("trigger_tokens")
                if trigger_tokens is not None:
                    compression_config.trigger_tokens = trigger_tokens

                config.context_window_compression = compression_config

            # Add thinking configuration to configuration, if provided
            if self._settings.thinking:
                config.thinking_config = self._settings.thinking

            # Add affective dialog setting, if provided
            if self._settings.enable_affective_dialog:
                config.enable_affective_dialog = self._settings.enable_affective_dialog

            # Add proactivity configuration to configuration, if provided
            if self._settings.proactivity:
                config.proactivity = self._settings.proactivity

            # Add VAD configuration to configuration, if provided
            if self._settings.vad:
                vad_config = AutomaticActivityDetection()
                vad_params = self._settings.vad
                has_vad_settings = False

                # Only add parameters that are explicitly set
                if vad_params.disabled is not None:
                    vad_config.disabled = vad_params.disabled
                    has_vad_settings = True

                if vad_params.start_sensitivity:
                    vad_config.start_of_speech_sensitivity = vad_params.start_sensitivity
                    has_vad_settings = True

                if vad_params.end_sensitivity:
                    vad_config.end_of_speech_sensitivity = vad_params.end_sensitivity
                    has_vad_settings = True

                if vad_params.prefix_padding_ms is not None:
                    vad_config.prefix_padding_ms = vad_params.prefix_padding_ms
                    has_vad_settings = True

                if vad_params.silence_duration_ms is not None:
                    vad_config.silence_duration_ms = vad_params.silence_duration_ms
                    has_vad_settings = True

                # Only add automatic_activity_detection if we have VAD settings
                if has_vad_settings:
                    config.realtime_input_config = RealtimeInputConfig(
                        automatic_activity_detection=vad_config
                    )

            # Add system instruction and tools to configuration, if provided.
            # These settings from the context take precedence over the ones
            # provided at initialization time.
            adapter: GeminiLLMAdapter = self.get_llm_adapter()
            system_instruction = None
            tools = None
            if self._context:
                params = adapter.get_llm_invocation_params(self._context)
                system_instruction = params["system_instruction"]
                tools = params["tools"]
            if not system_instruction:
                system_instruction = self._system_instruction_from_init
            if not tools:
                tools = adapter.from_standard_tools(self._tools_from_init)
            if system_instruction:
                logger.debug(f"Setting system instruction: {system_instruction}")
                config.system_instruction = system_instruction
            if tools:
                logger.debug(f"Setting tools: {tools}")
                config.tools = tools

            # Start the connection
            self._connection_task = self.create_task(self._connection_task_handler(config=config))

        except Exception as e:
            await self.push_error(error_msg=f"Initialization error: {e}", exception=e)

    async def _connection_task_handler(self, config: LiveConnectConfig):
        async with self._client.aio.live.connect(
            model=self._settings.model, config=config
        ) as session:
            logger.info("Connected to Gemini service")

            # Mark connection start time
            self._connection_start_time = time.time()

            await self._handle_session_ready(session)

            while True:
                try:
                    turn = self._session.receive()
                    async for message in turn:
                        # Reset failure counter if connection has been stable
                        self._check_and_reset_failure_counter()

                        if message.server_content and message.server_content.interrupted:
                            # NOTE: while the service triggers interruptions in
                            # the specific case of barge-ins, it does *not*
                            # emit UserStarted/StoppedSpeakingFrames, as the
                            # Gemini Live API does not give us broadly reliable
                            # signals to base those off of. Pipelines that
                            # require turn tracking (like those using context
                            # aggregators) still need an independent way to
                            # track turns, such as local Silero VAD in
                            # combination with the context aggregator default
                            # turn strategies.
                            logger.debug("Gemini VAD: interrupted signal received")
                            await self.broadcast_interruption()
                        elif message.server_content and message.server_content.model_turn:
                            await self._handle_msg_model_turn(message)
                        elif (
                            message.server_content
                            and message.server_content.turn_complete
                            and message.usage_metadata
                        ):
                            await self._handle_msg_turn_complete(message)
                            await self._handle_msg_usage_metadata(message)
                        elif message.server_content and message.server_content.input_transcription:
                            await self._handle_msg_input_transcription(message)
                        elif message.server_content and message.server_content.output_transcription:
                            await self._handle_msg_output_transcription(message)
                        elif message.server_content and message.server_content.grounding_metadata:
                            await self._handle_msg_grounding_metadata(message)
                        elif message.tool_call:
                            await self._handle_msg_tool_call(message)
                        elif message.session_resumption_update:
                            self._handle_msg_resumption_update(message)
                except Exception as e:
                    if not self._disconnecting:
                        should_reconnect = await self._handle_connection_error(e)
                        if should_reconnect:
                            await self._reconnect()
                            return  # Exit this connection handler, _reconnect will start a new one
                    break

    def _check_and_reset_failure_counter(self):
        """Check if connection has been stable long enough to reset the failure counter.

        If the connection has been active for longer than the established threshold
        and there are accumulated failures, reset the counter to 0.
        """
        if (
            self._connection_start_time
            and self._consecutive_failures > 0
            and time.time() - self._connection_start_time >= CONNECTION_ESTABLISHED_THRESHOLD
        ):
            logger.info(
                f"Connection stable for {CONNECTION_ESTABLISHED_THRESHOLD}s, "
                f"resetting failure counter from {self._consecutive_failures} to 0"
            )
            self._consecutive_failures = 0

    async def _handle_connection_error(self, error: Exception) -> bool:
        """Handle a connection error and determine if reconnection should be attempted.

        Args:
            error: The exception that caused the connection error.

        Returns:
            True if reconnection should be attempted, False if a fatal error should be pushed.
        """
        self._consecutive_failures += 1
        logger.warning(
            f"Connection error (failure {self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES}): {error}"
        )

        if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
            error_msg = (
                f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
                "treating as fatal error"
            )
            await self.push_error(error_msg=error_msg, exception=error)
            return False
        else:
            logger.info(
                f"Attempting reconnection ({self._consecutive_failures}/{MAX_CONSECUTIVE_FAILURES})"
            )
            return True

    async def _reconnect(self):
        """Reconnect to Gemini Live API."""
        await self._disconnect()
        await self._connect(session_resumption_handle=self._session_resumption_handle)

    async def _disconnect(self):
        """Disconnect from Gemini Live API and clean up resources."""
        logger.info("Disconnecting from Gemini service")
        try:
            self._disconnecting = True
            await self.stop_all_metrics()
            if self._connection_task:
                await self.cancel_task(self._connection_task, timeout=1.0)
                self._connection_task = None
            if self._transcription_timeout_task:
                await self.cancel_task(self._transcription_timeout_task)
                self._transcription_timeout_task = None
            if self._session:
                await self._session.close()
                self._session = None
            self._completed_tool_calls = set()
            self._disconnecting = False
        except Exception as e:
            await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)

    async def _send_user_audio(self, frame):
        """Send user audio frame to Gemini Live API."""
        if self._audio_input_paused or self._disconnecting or not self._session:
            return

        # Send all audio to Gemini
        try:
            await self._session.send_realtime_input(
                audio=Blob(data=frame.audio, mime_type=f"audio/pcm;rate={frame.sample_rate}")
            )
        except Exception as e:
            await self._handle_send_error(e)

        # Manage a buffer of audio to use for transcription
        audio = frame.audio
        if self._user_is_speaking:
            self._user_audio_buffer.extend(audio)
        else:
            # Keep 1/2 second of audio in the buffer even when not speaking.
            self._user_audio_buffer.extend(audio)
            length = int((frame.sample_rate * frame.num_channels * 2) * 0.5)
            self._user_audio_buffer = self._user_audio_buffer[-length:]

    async def _send_user_text(self, text: str):
        """Send user text via Gemini Live API's realtime input stream.

        This method sends text through the realtimeInput stream (via TextInputMessage)
        rather than the clientContent stream. This ensures text input is synchronized
        with audio and video inputs, preventing temporal misalignment that can occur
        when different modalities are processed through separate API pathways.

        For realtimeInput, turn completion is automatically inferred by the API based
        on user activity, so no explicit turnComplete signal is needed.

        Args:
            text: The text to send as user input.
        """
        if self._disconnecting or not self._session:
            return

        try:
            await self._session.send_realtime_input(text=text)
        except Exception as e:
            await self._handle_send_error(e)

    async def _send_user_video(self, frame):
        """Send user video frame to Gemini Live API."""
        if self._video_input_paused or self._disconnecting or not self._session:
            return

        now = time.time()
        if now - self._last_sent_time < 1:
            return  # Ignore if less than 1 second has passed

        self._last_sent_time = now  # Update last sent time
        logger.trace(f"Sending video frame to Gemini: {frame}")

        buffer = io.BytesIO()
        Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG")
        data = base64.b64encode(buffer.getvalue()).decode("utf-8")

        try:
            await self._session.send_realtime_input(video=Blob(data=data, mime_type="image/jpeg"))
        except Exception as e:
            await self._handle_send_error(e)

    async def _create_initial_response(self):
        """Create initial response based on context history."""
        if self._disconnecting:
            return

        if not self._session:
            self._run_llm_when_session_ready = True
            return

        adapter: GeminiLLMAdapter = self.get_llm_adapter()
        messages = adapter.get_llm_invocation_params(self._context).get("messages", [])
        if not messages:
            return

        logger.debug(f"Creating initial response: {messages}")

        await self.start_ttfb_metrics()

        try:
            await self._session.send_client_content(
                turns=messages, turn_complete=self._inference_on_context_initialization
            )
        except Exception as e:
            await self._handle_send_error(e)

        # If we're generating a response right away upon initializing
        # conversation history, set a flag saying that we need a turn complete
        # message when the user stops speaking.
        if not self._inference_on_context_initialization:
            self._needs_turn_complete_message = True

    async def _create_single_response(self, messages_list):
        """Create a single response from a list of messages."""
        if self._disconnecting or not self._session:
            return

        # Create a throwaway context just for the purpose of getting messages
        # in the right format
        context = LLMContext(messages=messages_list)
        adapter: GeminiLLMAdapter = self.get_llm_adapter()
        messages = adapter.get_llm_invocation_params(context).get("messages", [])

        if not messages:
            return

        logger.debug(f"Creating response: {messages}")

        await self.start_ttfb_metrics()

        try:
            await self._session.send_client_content(turns=messages, turn_complete=True)
        except Exception as e:
            await self._handle_send_error(e)

    @traced_gemini_live(operation="llm_tool_result")
    async def _tool_result(
        self, tool_call_id: str, tool_name: str, tool_result_message: Dict[str, Any]
    ):
        """Send tool result back to the API."""
        if self._disconnecting or not self._session:
            return

        # For now we're shoving the name into the tool_call_id field, so this
        # will work until we revisit that.
        response = FunctionResponse(name=tool_name, id=tool_call_id, response=tool_result_message)

        try:
            await self._session.send_tool_response(function_responses=response)
        except Exception as e:
            await self._handle_send_error(e)

    @traced_gemini_live(operation="llm_setup")
    async def _handle_session_ready(self, session: AsyncSession):
        """Handle the session being ready."""
        self._session = session
        # If we were just waititng for the session to be ready to run the LLM,
        # do that now.
        if self._run_llm_when_session_ready:
            self._run_llm_when_session_ready = False
            await self._create_initial_response()

    async def _handle_msg_model_turn(self, msg: LiveServerMessage):
        """Handle the model turn message."""
        part = msg.server_content.model_turn.parts[0]
        if not part:
            return

        await self.stop_ttfb_metrics()

        # part.text is added when `modalities` is set to TEXT; otherwise, it's None
        text = part.text
        if text:
            if not self._bot_is_responding:
                # Update bot responding state and send service start frame
                # (AUDIO modality case)
                await self._set_bot_is_responding(True)
                await self.push_frame(LLMFullResponseStartFrame())

            # Check if this is a thought
            if part.thought:
                # Gemini Live emits fully-formed thoughts rather than chunks,
                # so bracket each thought in start/end frames
                await self.push_frame(LLMThoughtStartFrame())
                await self.push_frame(LLMThoughtTextFrame(text))
                await self.push_frame(LLMThoughtEndFrame())
            else:
                # Regular text response
                self._bot_text_buffer += text
                self._search_result_buffer += text  # Also accumulate for grounding
                frame = LLMTextFrame(text=text)
                await self.push_frame(frame)

        # Check for grounding metadata in server content
        if msg.server_content and msg.server_content.grounding_metadata:
            self._accumulated_grounding_metadata = msg.server_content.grounding_metadata

        # If we have no audio, stop here.
        # All logic below this point pertains to the AUDIO modality.
        inline_data = part.inline_data
        if not inline_data:
            return

        # Check if mime type matches expected format
        expected_mime_type = f"audio/pcm;rate={self._sample_rate}"
        if inline_data.mime_type == expected_mime_type:
            # Perfect match, continue processing
            pass
        elif inline_data.mime_type == "audio/pcm":
            # Sample rate not provided in mime type, assume default
            if not hasattr(self, "_sample_rate_warning_logged"):
                logger.warning(
                    f"Sample rate not provided in mime type '{inline_data.mime_type}', assuming rate of {self._sample_rate}"
                )
                self._sample_rate_warning_logged = True
        else:
            # Unrecognized format
            logger.warning(f"Unrecognized server_content format {inline_data.mime_type}")
            return

        audio = inline_data.data
        if not audio:
            return

        # Update bot responding state and send service start frames
        # (AUDIO modality case)
        if not self._bot_is_responding:
            await self._set_bot_is_responding(True)
            await self.push_frame(TTSStartedFrame())
            await self.push_frame(LLMFullResponseStartFrame())

        self._bot_audio_buffer.extend(audio)
        frame = TTSAudioRawFrame(
            audio=audio,
            sample_rate=self._sample_rate,
            num_channels=1,
        )
        await self.push_frame(frame)

    @traced_gemini_live(operation="llm_tool_call")
    async def _handle_msg_tool_call(self, message: LiveServerMessage):
        """Handle tool call messages."""
        function_calls = message.tool_call.function_calls
        if not function_calls:
            return
        if not self._context:
            logger.error("Function calls are not supported without a context object.")

        function_calls_llm = [
            FunctionCallFromLLM(
                context=self._context,
                tool_call_id=(
                    # NOTE: when using Vertex AI we don't get server-provided
                    # tool call IDs here
                    f.id or str(uuid.uuid4())
                ),
                function_name=f.name,
                arguments=f.args,
            )
            for f in function_calls
        ]

        await self.run_function_calls(function_calls_llm)

    @traced_gemini_live(operation="llm_response")
    async def _handle_msg_turn_complete(self, message: LiveServerMessage):
        """Handle the turn complete message."""
        text = self._bot_text_buffer

        # Trace the complete LLM response (this will be handled by the decorator)
        # The decorator will extract the output text and usage metadata from the message

        self._bot_text_buffer = ""
        self._llm_output_buffer = ""

        # Process grounding metadata if we have accumulated any
        if self._accumulated_grounding_metadata:
            await self._process_grounding_metadata(
                self._accumulated_grounding_metadata, self._search_result_buffer
            )

        # Reset grounding tracking for next response
        self._search_result_buffer = ""
        self._accumulated_grounding_metadata = None

        if self._bot_is_responding:
            await self._set_bot_is_responding(False)
            if not text:
                # AUDIO modality case
                await self.push_frame(TTSStoppedFrame())
                await self.push_frame(LLMFullResponseEndFrame())
            else:
                # TEXT modality case
                await self.push_frame(LLMFullResponseEndFrame())

    @traced_stt
    async def _handle_user_transcription(
        self, transcript: str, is_final: bool, language: Optional[Language] = None
    ):
        """Handle a transcription result with tracing."""
        pass

    async def _push_user_transcription(self, text: str, result: Optional[LiveServerMessage] = None):
        """Push a user transcription frame upstream.

        Helper method to ensure consistent handling of user transcriptions
        from both punctuation-based and timeout-based paths.

        Args:
            text: The transcription text to push
            result: Optional LiveServerMessage that triggered this transcription
        """
        await self._handle_user_transcription(text, True, self._settings.language)
        await self.push_frame(
            TranscriptionFrame(
                text=text,
                user_id="",
                timestamp=time_now_iso8601(),
                result=result,
            ),
            FrameDirection.UPSTREAM,
        )

    async def _transcription_timeout_handler(self):
        """Handle timeout for user transcription buffer.

        If no new transcription messages arrive within the timeout period,
        flush any remaining text in the buffer as a complete sentence.
        """
        try:
            # Wait for timeout period (0.5 seconds)
            await asyncio.sleep(0.5)

            # If we still have buffered text after timeout, flush it
            if self._user_transcription_buffer:
                logger.trace(
                    f"[Transcription:user:timeout] Flushing buffer: [{self._user_transcription_buffer}]"
                )
                complete_sentence = self._user_transcription_buffer
                self._user_transcription_buffer = ""

                await self._push_user_transcription(complete_sentence, result=None)
        except asyncio.CancelledError:
            # Task was cancelled because new transcription arrived. This is expected
            # when back to back transcription messages arrive.
            logger.trace("Transcription timeout task cancelled (new text arrived)")
            raise

    async def _handle_msg_input_transcription(self, message: LiveServerMessage):
        """Handle the input transcription message.

        Gemini Live sends user transcriptions in either single words or multi-word
        phrases. As a result, we have to aggregate the input transcription. This handler
        aggregates into sentences, splitting on the end of sentence markers. If no
        punctuation arrives within a timeout period, the buffer is flushed automatically.
        """
        if not message.server_content.input_transcription:
            return

        text = message.server_content.input_transcription.text

        if not text:
            return

        # Cancel any existing timeout task since we received new text
        if self._transcription_timeout_task:
            await self.cancel_task(self._transcription_timeout_task)
            self._transcription_timeout_task = None

        # Strip leading space from sentence starts if buffer is empty
        if text.startswith(" ") and not self._user_transcription_buffer:
            text = text.lstrip()

        # Accumulate text in the buffer
        self._user_transcription_buffer += text

        # Check for complete sentences
        while True:
            eos_end_marker = match_endofsentence(self._user_transcription_buffer)
            if not eos_end_marker:
                break

            # Extract the complete sentence
            complete_sentence = self._user_transcription_buffer[:eos_end_marker]
            # Keep the remainder for the next chunk
            self._user_transcription_buffer = self._user_transcription_buffer[eos_end_marker:]

            # Send a TranscriptionFrame with the complete sentence
            logger.debug(f"[Transcription:user] [{complete_sentence}]")
            await self._push_user_transcription(complete_sentence, result=message)

        # If there's still text in the buffer (no end-of-sentence marker found),
        # start a timeout task to flush it later
        if self._user_transcription_buffer:
            self._transcription_timeout_task = self.create_task(
                self._transcription_timeout_handler()
            )
            # Let the event loop schedule the taks before it gets cancelled.
            await asyncio.sleep(0)

    async def _handle_msg_output_transcription(self, message: LiveServerMessage):
        """Handle the output transcription message."""
        if not message.server_content.output_transcription:
            return

        # This is the output transcription text when modalities is set to AUDIO.
        # In this case, we push TTSTextFrame to be handled by the downstream
        # assistant context aggregator.
        text = message.server_content.output_transcription.text

        if not text:
            return

        # Accumulate text for grounding as well
        self._search_result_buffer += text

        # Check for grounding metadata in server content
        if message.server_content and message.server_content.grounding_metadata:
            self._accumulated_grounding_metadata = message.server_content.grounding_metadata
        # Collect text for tracing
        self._llm_output_buffer += text

        # NOTE: Shoot. When using Vertex AI, output transcription messages
        # arrive *before* the model_turn messages with audio, so we need to
        # handle sending TTSStartedFrame and LLMFullResponseStartFrame here as
        # well. These messages also contain much *more* text (it looks further
        # ahead). That means that on an interruption our recorded context will
        # contain some text that was actually never spoken.
        if not self._bot_is_responding:
            await self._set_bot_is_responding(True)
            await self.push_frame(TTSStartedFrame())
            await self.push_frame(LLMFullResponseStartFrame())

        await self._push_output_transcription_text_frames(text)

    async def _push_output_transcription_text_frames(self, text: str):
        # In a typical "cascade" LLM + TTS setup, LLMTextFrames would not
        # proceed beyond the TTS service. Therefore, since a speech-to-speech
        # service like Gemini Live combines both LLM and TTS functionality, you
        # might think we wouldn't need to push LLMTextFrames at all. However,
        # RTVI relies on LLMTextFrames being pushed to trigger its
        # "bot-llm-text" event. So here we push an LLMTextFrame, too, but avoid
        # appending it to context to avoid context message duplication.

        # Push LLMTextFrame
        llm_text_frame = LLMTextFrame(text)
        llm_text_frame.append_to_context = False
        await self.push_frame(llm_text_frame)

        # Push TTSTextFrame
        tts_text_frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
        tts_text_frame.includes_inter_frame_spaces = True
        await self.push_frame(tts_text_frame)

    async def _handle_msg_grounding_metadata(self, message: LiveServerMessage):
        """Handle dedicated grounding metadata messages."""
        if message.server_content and message.server_content.grounding_metadata:
            grounding_metadata = message.server_content.grounding_metadata
            # Process the grounding metadata immediately
            await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer)

    async def _process_grounding_metadata(
        self, grounding_metadata: GroundingMetadata, search_result: str = ""
    ):
        """Process grounding metadata and emit LLMSearchResponseFrame."""
        if not grounding_metadata:
            return

        # Extract rendered content for search suggestions
        rendered_content = None
        if (
            grounding_metadata.search_entry_point
            and grounding_metadata.search_entry_point.rendered_content
        ):
            rendered_content = grounding_metadata.search_entry_point.rendered_content

        # Convert grounding chunks and supports to LLMSearchOrigin format
        origins = []

        if grounding_metadata.grounding_chunks and grounding_metadata.grounding_supports:
            # Create a mapping of chunk indices to origins
            chunk_to_origin: Dict[int, LLMSearchOrigin] = {}

            for index, chunk in enumerate(grounding_metadata.grounding_chunks):
                if chunk.web:
                    origin = LLMSearchOrigin(
                        site_uri=chunk.web.uri, site_title=chunk.web.title, results=[]
                    )
                    chunk_to_origin[index] = origin
                    origins.append(origin)

            # Add grounding support results to the appropriate origins
            for support in grounding_metadata.grounding_supports:
                if support.segment and support.grounding_chunk_indices:
                    text = support.segment.text or ""
                    confidence_scores = support.confidence_scores or []

                    # Add this result to all origins referenced by this support
                    for chunk_index in support.grounding_chunk_indices:
                        if chunk_index in chunk_to_origin:
                            result = LLMSearchResult(text=text, confidence=confidence_scores)
                            chunk_to_origin[chunk_index].results.append(result)

        # Create and push the search response frame
        search_frame = LLMSearchResponseFrame(
            search_result=search_result, origins=origins, rendered_content=rendered_content
        )

        await self.push_frame(search_frame)

    async def _handle_msg_usage_metadata(self, message: LiveServerMessage):
        """Handle the usage metadata message."""
        if not message.usage_metadata:
            return

        usage = message.usage_metadata

        # Ensure we have valid integers for all token counts
        prompt_tokens = usage.prompt_token_count or 0
        completion_tokens = usage.response_token_count or 0
        total_tokens = usage.total_token_count or (prompt_tokens + completion_tokens)

        tokens = LLMTokenUsage(
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens,
            cache_read_input_tokens=usage.cached_content_token_count,
            reasoning_tokens=usage.thoughts_token_count,
        )

        await self.start_llm_usage_metrics(tokens)

    def _handle_msg_resumption_update(self, message: LiveServerMessage):
        update = message.session_resumption_update
        if update.resumable and update.new_handle:
            self._session_resumption_handle = update.new_handle

    async def _handle_send_error(self, error: Exception):
        # Ignore "expected" errors that may have occurred for messages that
        # were in-flight when a disconnection occurred.
        if self._disconnecting or not self._session:
            return

        # In server-to-server contexts, a WebSocket error should be quite rare.
        # Given how hard it is to recover from a send-side error with proper
        # state management, and that exponential backoff for retries can have
        # cost/stability implications for a service cluster, let's just treat a
        # send-side error as fatal.
        await self.push_error(error_msg=f"Send error: {error}")

    def create_context_aggregator(
        self,
        context: OpenAILLMContext,
        *,
        user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
        assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
    ) -> LLMContextAggregatorPair:
        """Create an instance of GeminiLiveContextAggregatorPair from an OpenAILLMContext.

        Constructor keyword arguments for both the user and assistant aggregators can be provided.

        NOTE: this method exists only for backward compatibility. New code
        should instead do::

            context = LLMContext(...)
            context_aggregator = LLMContextAggregatorPair(context)

        Args:
            context: The LLM context to use.
            user_params: User aggregator parameters. Defaults to LLMUserAggregatorParams().
            assistant_params: Assistant aggregator parameters. Defaults to LLMAssistantAggregatorParams().

        Returns:
            A pair of user and assistant context aggregators.

        .. deprecated:: 0.0.99
            `create_context_aggregator()` is deprecated and will be removed in a future version.
            Use the universal `LLMContext` and `LLMContextAggregatorPair` instead.
            See `OpenAILLMContext` docstring for migration guide.
        """
        # from_openai_context handles deprecation warning
        context = LLMContext.from_openai_context(context)
        assistant_params.expect_stripped_words = False
        return LLMContextAggregatorPair(
            context, user_params=user_params, assistant_params=assistant_params
        )
