#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""ElevenLabs text-to-speech service implementations.

This module provides WebSocket and HTTP-based TTS services using ElevenLabs API
with support for streaming audio, word timestamps, and voice customization.
"""

import asyncio
import base64
import json
from dataclasses import dataclass, field
from typing import (
    Any,
    AsyncGenerator,
    ClassVar,
    Dict,
    List,
    Literal,
    Mapping,
    Optional,
    Tuple,
    Union,
)

import aiohttp
from loguru import logger
from pydantic import BaseModel

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    InterruptionFrame,
    LLMFullResponseEndFrame,
    StartFrame,
    TTSAudioRawFrame,
    TTSStartedFrame,
    TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, _warn_deprecated_param
from pipecat.services.tts_service import (
    TextAggregationMode,
    TTSService,
    WebsocketTTSService,
)
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts

# See .env.example for ElevenLabs configuration needed
try:
    import websockets
    from websockets.asyncio.client import connect as websocket_connect
    from websockets.protocol import State
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.")
    raise Exception(f"Missing module: {e}")

ElevenLabsOutputFormat = Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"]

# Models that support language codes
# The following models are excluded as they don't support language codes:
# - eleven_flash_v2
# - eleven_turbo_v2
# - eleven_multilingual_v2
ELEVENLABS_MULTILINGUAL_MODELS = {
    "eleven_flash_v2_5",
    "eleven_turbo_v2_5",
}


def language_to_elevenlabs_language(language: Language) -> Optional[str]:
    """Convert a Language enum to ElevenLabs language code.

    Args:
        language: The Language enum value to convert.

    Returns:
        The corresponding ElevenLabs language code, or None if not supported.
    """
    LANGUAGE_MAP = {
        Language.AR: "ar",
        Language.BG: "bg",
        Language.CS: "cs",
        Language.DA: "da",
        Language.DE: "de",
        Language.EL: "el",
        Language.EN: "en",
        Language.ES: "es",
        Language.FI: "fi",
        Language.FIL: "fil",
        Language.FR: "fr",
        Language.HI: "hi",
        Language.HR: "hr",
        Language.HU: "hu",
        Language.ID: "id",
        Language.IT: "it",
        Language.JA: "ja",
        Language.KO: "ko",
        Language.MS: "ms",
        Language.NL: "nl",
        Language.NO: "no",
        Language.PL: "pl",
        Language.PT: "pt",
        Language.RO: "ro",
        Language.RU: "ru",
        Language.SK: "sk",
        Language.SV: "sv",
        Language.TA: "ta",
        Language.TR: "tr",
        Language.UK: "uk",
        Language.VI: "vi",
        Language.ZH: "zh",
    }

    return resolve_language(language, LANGUAGE_MAP, use_base_code=True)


def output_format_from_sample_rate(sample_rate: int) -> str:
    """Get the appropriate output format string for a given sample rate.

    Args:
        sample_rate: The audio sample rate in Hz.

    Returns:
        The ElevenLabs output format string.
    """
    match sample_rate:
        case 8000:
            return "pcm_8000"
        case 16000:
            return "pcm_16000"
        case 22050:
            return "pcm_22050"
        case 24000:
            return "pcm_24000"
        case 44100:
            return "pcm_44100"
    logger.warning(
        f"ElevenLabsTTSService: No output format available for {sample_rate} sample rate"
    )
    return "pcm_24000"


def build_elevenlabs_voice_settings(
    settings: Union[Dict[str, Any], "TTSSettings"],
) -> Optional[Dict[str, Union[float, bool]]]:
    """Build voice settings dictionary for ElevenLabs based on provided settings.

    Args:
        settings: Dictionary or settings containing voice settings parameters.

    Returns:
        Dictionary of voice settings or None if no valid settings are provided.
    """
    voice_setting_keys = ["stability", "similarity_boost", "style", "use_speaker_boost", "speed"]

    voice_settings = {}
    for key in voice_setting_keys:
        val = (
            getattr(settings, key, None) if isinstance(settings, TTSSettings) else settings.get(key)
        )
        if val is not None:
            voice_settings[key] = val

    return voice_settings or None


class PronunciationDictionaryLocator(BaseModel):
    """Locator for a pronunciation dictionary.

    Parameters:
        pronunciation_dictionary_id: The ID of the pronunciation dictionary.
        version_id: The version ID of the pronunciation dictionary.
    """

    pronunciation_dictionary_id: str
    version_id: str


@dataclass
class ElevenLabsTTSSettings(TTSSettings):
    """Settings for ElevenLabsTTSService.

    Fields that appear in the WebSocket URL (``voice``, ``model``,
    ``language``) require a full reconnect when changed.  Fields that
    affect the voice character (``stability``, ``similarity_boost``,
    ``style``, ``use_speaker_boost``, ``speed``) can be applied by closing
    the current audio context so a new one is opened with updated settings.

    Parameters:
        stability: Voice stability control (0.0 to 1.0).
        similarity_boost: Similarity boost control (0.0 to 1.0).
        style: Style control for voice expression (0.0 to 1.0).
        use_speaker_boost: Whether to use speaker boost enhancement.
        speed: Voice speed control (0.7 to 1.2).
        apply_text_normalization: Text normalization mode ("auto", "on", "off").
    """

    stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field(
        default_factory=lambda: NOT_GIVEN
    )

    #: Fields in the WS URL — changing any of these requires a reconnect.
    URL_FIELDS: ClassVar[frozenset[str]] = frozenset({"voice", "model", "language"})

    #: Fields affecting voice character — changing these requires closing the
    #: current audio context so the next one picks up new settings.
    VOICE_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset(
        {"stability", "similarity_boost", "style", "use_speaker_boost", "speed"}
    )


@dataclass
class ElevenLabsHttpTTSSettings(TTSSettings):
    """Settings for ElevenLabsHttpTTSService.

    Parameters:
        optimize_streaming_latency: Latency optimization level (0-4).
        stability: Voice stability control (0.0 to 1.0).
        similarity_boost: Similarity boost control (0.0 to 1.0).
        style: Style control for voice expression (0.0 to 1.0).
        use_speaker_boost: Whether to use speaker boost enhancement.
        speed: Voice speed control (0.25 to 4.0).
        apply_text_normalization: Text normalization mode ("auto", "on", "off").
    """

    optimize_streaming_latency: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field(
        default_factory=lambda: NOT_GIVEN
    )


def calculate_word_times(
    alignment_info: Mapping[str, Any],
    cumulative_time: float,
    partial_word: str = "",
    partial_word_start_time: float = 0.0,
) -> tuple[List[Tuple[str, float]], str, float]:
    """Calculate word timestamps from character alignment information.

    Args:
        alignment_info: Character alignment data from ElevenLabs API.
        cumulative_time: Base time offset for this chunk.
        partial_word: Partial word carried over from previous chunk.
        partial_word_start_time: Start time of the partial word.

    Returns:
        Tuple of (word_times, new_partial_word, new_partial_word_start_time):
        - word_times: List of (word, timestamp) tuples for complete words
        - new_partial_word: Incomplete word at end of chunk (empty if chunk ends with space)
        - new_partial_word_start_time: Start time of the incomplete word
    """
    chars = alignment_info["chars"]
    char_start_times_ms = alignment_info["charStartTimesMs"]

    if len(chars) != len(char_start_times_ms):
        logger.error(
            f"calculate_word_times: length mismatch - chars={len(chars)}, times={len(char_start_times_ms)}"
        )
        return ([], partial_word, partial_word_start_time)

    # Build words and track their start positions
    words = []
    word_start_times = []
    current_word = partial_word  # Start with any partial word from previous chunk
    word_start_time = partial_word_start_time if partial_word else None

    for i, char in enumerate(chars):
        if char == " ":
            # End of current word
            if current_word:  # Only add non-empty words
                words.append(current_word)
                word_start_times.append(word_start_time)
                current_word = ""
                word_start_time = None
        else:
            # Building a word
            if word_start_time is None:  # First character of new word
                # Convert from milliseconds to seconds and add cumulative offset
                word_start_time = cumulative_time + (char_start_times_ms[i] / 1000.0)
            current_word += char

    # Build result for complete words
    word_times = list(zip(words, word_start_times))

    # Return any incomplete word at the end of this chunk
    new_partial_word = current_word if current_word else ""
    new_partial_word_start_time = word_start_time if word_start_time is not None else 0.0

    return (word_times, new_partial_word, new_partial_word_start_time)


class ElevenLabsTTSService(WebsocketTTSService):
    """ElevenLabs WebSocket-based TTS service with word timestamps.

    Provides real-time text-to-speech using ElevenLabs' WebSocket streaming API.
    Supports word-level timestamps, audio context management, and various voice
    customization options including stability, similarity boost, and speed controls.
    """

    Settings = ElevenLabsTTSSettings
    _settings: ElevenLabsTTSSettings

    class InputParams(BaseModel):
        """Input parameters for ElevenLabs TTS configuration.

        .. deprecated:: 0.0.105
            Use ``settings=ElevenLabsTTSSettings(...)`` instead.

        Parameters:
            language: Language to use for synthesis.
            stability: Voice stability control (0.0 to 1.0).
            similarity_boost: Similarity boost control (0.0 to 1.0).
            style: Style control for voice expression (0.0 to 1.0).
            use_speaker_boost: Whether to use speaker boost enhancement.
            speed: Voice speed control (0.7 to 1.2).
            auto_mode: Whether to enable automatic mode optimization.
            enable_ssml_parsing: Whether to parse SSML tags in text.
            enable_logging: Whether to enable ElevenLabs logging.
            apply_text_normalization: Text normalization mode ("auto", "on", "off").
            pronunciation_dictionary_locators: List of pronunciation dictionary locators to use.
        """

        language: Optional[Language] = None
        stability: Optional[float] = None
        similarity_boost: Optional[float] = None
        style: Optional[float] = None
        use_speaker_boost: Optional[bool] = None
        speed: Optional[float] = None
        auto_mode: Optional[bool] = True
        enable_ssml_parsing: Optional[bool] = None
        enable_logging: Optional[bool] = None
        apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None
        pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None

    def __init__(
        self,
        *,
        api_key: str,
        voice_id: Optional[str] = None,
        model: Optional[str] = None,
        url: str = "wss://api.elevenlabs.io",
        sample_rate: Optional[int] = None,
        auto_mode: bool = True,
        enable_ssml_parsing: Optional[bool] = None,
        enable_logging: Optional[bool] = None,
        pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None,
        params: Optional[InputParams] = None,
        settings: Optional[ElevenLabsTTSSettings] = None,
        text_aggregation_mode: Optional[TextAggregationMode] = None,
        aggregate_sentences: Optional[bool] = None,
        **kwargs,
    ):
        """Initialize the ElevenLabs TTS service.

        Args:
            api_key: ElevenLabs API key for authentication.
            voice_id: ID of the voice to use for synthesis.

                .. deprecated:: 0.0.105
                    Use ``settings=ElevenLabsTTSSettings(voice=...)`` instead.

            model: TTS model to use (e.g., "eleven_turbo_v2_5").

                .. deprecated:: 0.0.105
                    Use ``settings=ElevenLabsTTSSettings(model=...)`` instead.

            url: WebSocket URL for ElevenLabs TTS API.
            sample_rate: Audio sample rate. If None, uses default.
            auto_mode: Whether to enable automatic mode optimization.
            enable_ssml_parsing: Whether to parse SSML tags in text.
            enable_logging: Whether to enable ElevenLabs server-side logging.
            pronunciation_dictionary_locators: List of pronunciation dictionary
                locators to use.
            params: Additional input parameters for voice customization.

                .. deprecated:: 0.0.105
                    Use ``settings=ElevenLabsTTSSettings(...)`` instead.

            settings: Runtime-updatable settings. When provided alongside deprecated
                parameters, ``settings`` values take precedence.
            text_aggregation_mode: How to aggregate incoming text before synthesis.
            aggregate_sentences: Whether to aggregate sentences within the TTSService.

                .. deprecated:: 0.0.104
                    Use ``text_aggregation_mode`` instead.

            **kwargs: Additional arguments passed to the parent service.
        """
        # By default, we aggregate sentences before sending to TTS. This adds
        # ~200-300ms of latency per sentence (waiting for the sentence-ending
        # punctuation token from the LLM). Setting
        # text_aggregation_mode=TextAggregationMode.TOKEN streams tokens
        # directly. To use this mode, you must set auto_mode=False. This
        # eliminates aggregation time, but slows down ElevenLabs.
        #
        # We also don't want to automatically push LLM response text frames,
        # because the context aggregators will add them to the LLM context even
        # if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
        # can use those to generate text frames ourselves aligned with the
        # playout timing of the audio!
        #
        # Finally, ElevenLabs doesn't provide information on when the bot stops
        # speaking for a while, so we want the parent class to send TTSStopFrame
        # after a short period not receiving any audio.

        # 1. Initialize default_settings with hardcoded defaults
        default_settings = ElevenLabsTTSSettings(
            model="eleven_turbo_v2_5",
            voice=None,
            language=None,
            stability=None,
            similarity_boost=None,
            style=None,
            use_speaker_boost=None,
            speed=None,
            apply_text_normalization=None,
        )

        # 2. Apply direct init arg overrides (deprecated)
        if voice_id is not None:
            _warn_deprecated_param("voice_id", ElevenLabsTTSSettings, "voice")
            default_settings.voice = voice_id
        if model is not None:
            _warn_deprecated_param("model", ElevenLabsTTSSettings, "model")
            default_settings.model = model

        # 3. Apply params overrides — only if settings not provided
        _pronunciation_dictionary_locators = pronunciation_dictionary_locators
        if params is not None:
            _warn_deprecated_param("params", ElevenLabsTTSSettings)
            if not settings:
                if params.language is not None:
                    default_settings.language = self.language_to_service_language(params.language)
                if params.stability is not None:
                    default_settings.stability = params.stability
                if params.similarity_boost is not None:
                    default_settings.similarity_boost = params.similarity_boost
                if params.style is not None:
                    default_settings.style = params.style
                if params.use_speaker_boost is not None:
                    default_settings.use_speaker_boost = params.use_speaker_boost
                if params.speed is not None:
                    default_settings.speed = params.speed
                if params.auto_mode is not None:
                    auto_mode = params.auto_mode
                if params.enable_ssml_parsing is not None:
                    enable_ssml_parsing = params.enable_ssml_parsing
                if params.enable_logging is not None:
                    enable_logging = params.enable_logging
                if params.apply_text_normalization is not None:
                    default_settings.apply_text_normalization = params.apply_text_normalization
                if _pronunciation_dictionary_locators is None:
                    _pronunciation_dictionary_locators = params.pronunciation_dictionary_locators

        # 4. Apply settings delta (canonical API, always wins)
        if settings is not None:
            default_settings.apply_update(settings)

        super().__init__(
            text_aggregation_mode=text_aggregation_mode,
            aggregate_sentences=aggregate_sentences,
            push_text_frames=False,
            push_stop_frames=True,
            pause_frame_processing=True,
            sample_rate=sample_rate,
            settings=default_settings,
            **kwargs,
        )

        self._api_key = api_key
        self._url = url

        # Init-only WebSocket URL params (not runtime-updatable).
        self._auto_mode = auto_mode
        self._enable_ssml_parsing = enable_ssml_parsing
        self._enable_logging = enable_logging

        self._output_format = ""  # initialized in start()
        self._voice_settings = self._set_voice_settings()
        self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators

        self._cumulative_time = 0
        # Track partial words that span across alignment chunks
        self._partial_word = ""
        self._partial_word_start_time = 0.0

        # Context management for v1 multi API
        self._receive_task = None
        self._keepalive_task = None

    def can_generate_metrics(self) -> bool:
        """Check if this service can generate processing metrics.

        Returns:
            True, as ElevenLabs service supports metrics generation.
        """
        return True

    def language_to_service_language(self, language: Language) -> Optional[str]:
        """Convert a Language enum to ElevenLabs language format.

        Args:
            language: The language to convert.

        Returns:
            The ElevenLabs-specific language code, or None if not supported.
        """
        return language_to_elevenlabs_language(language)

    def _set_voice_settings(self):
        return build_elevenlabs_voice_settings(self._settings)

    async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
        """Apply a settings delta, reconnecting as needed.

        Uses the declarative ``URL_FIELDS`` and ``VOICE_SETTINGS_FIELDS``
        sets on :class:`ElevenLabsTTSSettings` to decide whether to
        reconnect the WebSocket or close the current audio context.

        Args:
            delta: A :class:`TTSSettings` (or ``ElevenLabsTTSSettings``) delta.

        Returns:
            Dict mapping changed field names to their previous values.
        """
        changed = await super()._update_settings(delta)

        if not changed:
            return changed

        # Rebuild voice settings for next context
        self._voice_settings = self._set_voice_settings()

        url_changed = bool(changed.keys() & ElevenLabsTTSSettings.URL_FIELDS)
        voice_settings_changed = bool(changed.keys() & ElevenLabsTTSSettings.VOICE_SETTINGS_FIELDS)

        if url_changed:
            logger.debug(
                f"URL-level setting changed ({changed.keys() & ElevenLabsTTSSettings.URL_FIELDS}), "
                f"reconnecting WebSocket"
            )
            await self._disconnect()
            await self._connect()
        elif voice_settings_changed:
            logger.debug(
                f"Voice settings changed ({changed.keys() & ElevenLabsTTSSettings.VOICE_SETTINGS_FIELDS}), "
                f"closing current context to apply changes"
            )
            audio_contexts = self.get_audio_contexts()
            if audio_contexts:
                for ctx_id in audio_contexts:
                    await self._close_context(ctx_id)

        if not url_changed:
            # Reconnect applies all settings; only warn about fields not handled
            # by voice settings or URL changes.
            handled = ElevenLabsTTSSettings.URL_FIELDS | ElevenLabsTTSSettings.VOICE_SETTINGS_FIELDS
            self._warn_unhandled_updated_settings(changed.keys() - handled)

        return changed

    async def start(self, frame: StartFrame):
        """Start the ElevenLabs TTS service.

        Args:
            frame: The start frame containing initialization parameters.
        """
        await super().start(frame)
        self._output_format = output_format_from_sample_rate(self.sample_rate)
        await self._connect()

    async def stop(self, frame: EndFrame):
        """Stop the ElevenLabs TTS service.

        Args:
            frame: The end frame.
        """
        await super().stop(frame)
        await self._disconnect()

    async def cancel(self, frame: CancelFrame):
        """Cancel the ElevenLabs TTS service.

        Args:
            frame: The cancel frame.
        """
        await super().cancel(frame)
        await self._disconnect()

    async def flush_audio(self, context_id: Optional[str] = None):
        """Flush any pending audio and finalize the current context.

        Args:
            context_id: The specific context to flush. If None, falls back to the
                currently active context.
        """
        flush_id = context_id or self.get_active_audio_context_id()
        if not flush_id or not self._websocket:
            return
        logger.trace(f"{self}: flushing audio")
        msg = {"context_id": flush_id, "flush": True}
        await self._websocket.send(json.dumps(msg))

    async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
        """Push a frame and handle state changes.

        Args:
            frame: The frame to push.
            direction: The direction to push the frame.
        """
        await super().push_frame(frame, direction)
        if isinstance(frame, (TTSStoppedFrame, InterruptionFrame)):
            if isinstance(frame, TTSStoppedFrame):
                await self.add_word_timestamps([("Reset", 0)], self.get_active_audio_context_id())

    async def _connect(self):
        await super()._connect()

        await self._connect_websocket()

        if self._websocket and not self._receive_task:
            self._receive_task = self.create_task(self._receive_task_handler(self._report_error))

        if self._websocket and not self._keepalive_task:
            self._keepalive_task = self.create_task(self._keepalive_task_handler())

    async def _disconnect(self):
        await super()._disconnect()

        if self._receive_task:
            await self.cancel_task(self._receive_task)
            self._receive_task = None

        if self._keepalive_task:
            await self.cancel_task(self._keepalive_task)
            self._keepalive_task = None

        await self._disconnect_websocket()

    async def _connect_websocket(self):
        try:
            if self._websocket and self._websocket.state is State.OPEN:
                return

            logger.debug("Connecting to ElevenLabs")

            voice_id = self._settings.voice
            model = self._settings.model
            output_format = self._output_format
            url = f"{self._url}/v1/text-to-speech/{voice_id}/multi-stream-input?model_id={model}&output_format={output_format}&auto_mode={str(self._auto_mode).lower()}"

            if self._enable_ssml_parsing:
                url += f"&enable_ssml_parsing={self._enable_ssml_parsing}"

            if self._enable_logging:
                url += f"&enable_logging={self._enable_logging}"

            if self._settings.apply_text_normalization is not None:
                url += f"&apply_text_normalization={self._settings.apply_text_normalization}"

            # Language can only be used with the ELEVENLABS_MULTILINGUAL_MODELS
            language = self._settings.language
            if model in ELEVENLABS_MULTILINGUAL_MODELS and language is not None:
                url += f"&language_code={language}"
                logger.debug(f"Using language code: {language}")
            elif language is not None:
                logger.warning(
                    f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
                )

            # Set max websocket message size to 16MB for large audio responses
            self._websocket = await websocket_connect(
                url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
            )

            await self._call_event_handler("on_connected")
        except Exception as e:
            self._websocket = None
            await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
            await self._call_event_handler("on_connection_error", f"{e}")

    async def _disconnect_websocket(self):
        try:
            await self.stop_all_metrics()

            if self._websocket:
                logger.debug("Disconnecting from ElevenLabs")
                await self._websocket.send(json.dumps({"close_socket": True}))
                await self._websocket.close()
                logger.debug("Disconnected from ElevenLabs")
        except Exception as e:
            await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
        finally:
            await self.remove_active_audio_context()
            self._websocket = None
            await self._call_event_handler("on_disconnected")

    def _get_websocket(self):
        if self._websocket:
            return self._websocket
        raise Exception("Websocket not connected")

    async def _close_context(self, context_id: str):
        # ElevenLabs requires that Pipecat explicitly closes contexts to free
        # server-side resources, both on interruption and on normal completion.
        if context_id and self._websocket:
            logger.trace(f"{self}: Closing context {context_id}")
            try:
                # ElevenLabs requires that Pipecat manages the contexts and closes them
                # when they're not longer in use. Since an InterruptionFrame is pushed
                # every time the user speaks, we'll use this as a trigger to close the context
                # and reset the state.
                # Note: We do not need to call remove_audio_context here, as the context is
                # automatically reset when super ()._handle_interruption is called.
                await self._websocket.send(
                    json.dumps({"context_id": context_id, "close_context": True})
                )
            except Exception as e:
                await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
        self._cumulative_time = 0.0
        self._partial_word = ""
        self._partial_word_start_time = 0.0

    async def on_audio_context_interrupted(self, context_id: str):
        """Close the ElevenLabs context when the bot is interrupted."""
        await self._close_context(context_id)

    async def on_audio_context_completed(self, context_id: str):
        """Close the ElevenLabs context after all audio has been played.

        ElevenLabs does not send a server-side signal when a context is
        exhausted, so Pipecat must explicitly close it with
        ``close_context: True`` to free server-side resources.
        """
        await self._close_context(context_id)

    async def _receive_messages(self):
        """Handle incoming WebSocket messages from ElevenLabs."""
        async for message in self._get_websocket():
            msg = json.loads(message)

            received_ctx_id = msg.get("contextId")

            # Handle final messages first, regardless of context availability
            # At the moment, this message is received AFTER the close_context message is
            # sent, so it doesn't serve any functional purpose. For now, we'll just log it.
            if msg.get("isFinal") is True:
                logger.trace(f"Received final message for context {received_ctx_id}")
                continue

            # Check if this message belongs to the current context.
            if not self.audio_context_available(received_ctx_id):
                if self.get_active_audio_context_id() == received_ctx_id:
                    logger.debug(
                        f"Received a delayed message, recreating the context: {received_ctx_id}"
                    )
                    await self.create_audio_context(received_ctx_id)
                else:
                    # This can happen if a message is received _after_ we have closed a context
                    # due to user interruption but _before_ the `isFinal` message for the context
                    # is received.
                    logger.debug(f"Ignoring message from unavailable context: {received_ctx_id}")
                    continue

            if msg.get("audio"):
                audio = base64.b64decode(msg["audio"])
                frame = TTSAudioRawFrame(audio, self.sample_rate, 1, context_id=received_ctx_id)
                await self.append_to_audio_context(received_ctx_id, frame)

            if msg.get("alignment"):
                alignment = msg["alignment"]
                word_times, self._partial_word, self._partial_word_start_time = (
                    calculate_word_times(
                        alignment,
                        self._cumulative_time,
                        self._partial_word,
                        self._partial_word_start_time,
                    )
                )

                if word_times:
                    await self.add_word_timestamps(word_times, received_ctx_id)

                    # Calculate the actual end time of this audio chunk
                    char_start_times_ms = alignment.get("charStartTimesMs", [])
                    char_durations_ms = alignment.get("charDurationsMs", [])

                    if char_start_times_ms and char_durations_ms:
                        # End time = start time of last character + duration of last character
                        chunk_end_time_ms = char_start_times_ms[-1] + char_durations_ms[-1]
                        chunk_end_time_seconds = chunk_end_time_ms / 1000.0
                        self._cumulative_time += chunk_end_time_seconds
                    else:
                        # Fallback: use the last word's start time (current behavior)
                        self._cumulative_time = word_times[-1][1]
                        logger.warning(
                            "_receive_messages: using fallback timing method - consider investigating alignment data structure"
                        )

    async def _keepalive_task_handler(self):
        """Send periodic keepalive messages to maintain WebSocket connection."""
        KEEPALIVE_SLEEP = 10
        while True:
            await asyncio.sleep(KEEPALIVE_SLEEP)
            try:
                if self._websocket and self._websocket.state is State.OPEN:
                    context_id = self.get_active_audio_context_id()
                    if context_id:
                        # Send keepalive with context ID to keep the connection alive
                        keepalive_message = {
                            "text": "",
                            "context_id": context_id,
                        }
                        logger.trace(f"Sending keepalive for context {context_id}")
                    else:
                        # It's possible to have a user interruption which clears the context
                        # without generating a new TTS response. In this case, we'll just send
                        # an empty message to keep the connection alive.
                        keepalive_message = {"text": ""}
                        logger.trace("Sending keepalive without context")
                    await self._websocket.send(json.dumps(keepalive_message))
            except websockets.ConnectionClosed as e:
                logger.warning(f"{self} keepalive error: {e}")
                break

    async def _send_text(self, text: str, context_id: str):
        """Send text to the WebSocket for synthesis."""
        if self._websocket and context_id:
            msg = {"text": text, "context_id": context_id}
            await self._websocket.send(json.dumps(msg))

    @traced_tts
    async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
        """Generate speech from text using ElevenLabs' streaming WebSocket API.

        Args:
            text: The text to synthesize into speech.
            context_id: The context ID for tracking audio frames.

        Yields:
            Frame: Audio frames containing the synthesized speech.
        """
        logger.debug(f"{self}: Generating TTS [{text}]")

        try:
            if not self._websocket or self._websocket.state is State.CLOSED:
                await self._connect()

            try:
                if not self.audio_context_available(context_id):
                    await self.create_audio_context(context_id)
                    await self.start_ttfb_metrics()
                    yield TTSStartedFrame(context_id=context_id)
                    self._cumulative_time = 0
                    self._partial_word = ""
                    self._partial_word_start_time = 0.0

                    # Initialize context with voice settings and pronunciation dictionaries
                    msg = {"text": " ", "context_id": context_id}
                    if self._voice_settings:
                        msg["voice_settings"] = self._voice_settings
                    if self._pronunciation_dictionary_locators:
                        msg["pronunciation_dictionary_locators"] = [
                            locator.model_dump()
                            for locator in self._pronunciation_dictionary_locators
                        ]
                    await self._websocket.send(json.dumps(msg))
                    logger.trace(f"Created new context {context_id}")

                await self._send_text(text, context_id)
                await self.start_tts_usage_metrics(text)
            except Exception as e:
                yield TTSStoppedFrame(context_id=context_id)
                yield ErrorFrame(error=f"Unknown error occurred: {e}")
                return
            yield None
        except Exception as e:
            yield ErrorFrame(error=f"Unknown error occurred: {e}")


class ElevenLabsHttpTTSService(TTSService):
    """ElevenLabs HTTP-based TTS service with word timestamps.

    Provides text-to-speech using ElevenLabs' HTTP streaming API for simpler,
    non-WebSocket integration. Suitable for use cases where streaming WebSocket
    connection is not required or desired.
    """

    Settings = ElevenLabsHttpTTSSettings
    _settings: ElevenLabsHttpTTSSettings

    class InputParams(BaseModel):
        """Input parameters for ElevenLabs HTTP TTS configuration.

        .. deprecated:: 0.0.105
            Use ``settings=ElevenLabsHttpTTSSettings(...)`` instead.

        Parameters:
            language: Language to use for synthesis.
            optimize_streaming_latency: Latency optimization level (0-4).
            stability: Voice stability control (0.0 to 1.0).
            similarity_boost: Similarity boost control (0.0 to 1.0).
            style: Style control for voice expression (0.0 to 1.0).
            use_speaker_boost: Whether to use speaker boost enhancement.
            speed: Voice speed control (0.25 to 4.0).
            apply_text_normalization: Text normalization mode ("auto", "on", "off").
            pronunciation_dictionary_locators: List of pronunciation dictionary locators to use.
        """

        language: Optional[Language] = None
        optimize_streaming_latency: Optional[int] = None
        stability: Optional[float] = None
        similarity_boost: Optional[float] = None
        style: Optional[float] = None
        use_speaker_boost: Optional[bool] = None
        speed: Optional[float] = None
        apply_text_normalization: Optional[Literal["auto", "on", "off"]] = None
        pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None

    def __init__(
        self,
        *,
        api_key: str,
        voice_id: Optional[str] = None,
        aiohttp_session: aiohttp.ClientSession,
        model: Optional[str] = None,
        base_url: str = "https://api.elevenlabs.io",
        sample_rate: Optional[int] = None,
        pronunciation_dictionary_locators: Optional[List[PronunciationDictionaryLocator]] = None,
        params: Optional[InputParams] = None,
        settings: Optional[ElevenLabsHttpTTSSettings] = None,
        text_aggregation_mode: Optional[TextAggregationMode] = None,
        aggregate_sentences: Optional[bool] = None,
        **kwargs,
    ):
        """Initialize the ElevenLabs HTTP TTS service.

        Args:
            api_key: ElevenLabs API key for authentication.
            voice_id: ID of the voice to use for synthesis.

                .. deprecated:: 0.0.105
                    Use ``settings=ElevenLabsHttpTTSSettings(voice=...)`` instead.

            aiohttp_session: aiohttp ClientSession for HTTP requests.
            model: TTS model to use (e.g., "eleven_turbo_v2_5").

                .. deprecated:: 0.0.105
                    Use ``settings=ElevenLabsHttpTTSSettings(model=...)`` instead.

            base_url: Base URL for ElevenLabs HTTP API.
            sample_rate: Audio sample rate. If None, uses default.
            pronunciation_dictionary_locators: List of pronunciation dictionary
                locators to use.
            params: Additional input parameters for voice customization.

                .. deprecated:: 0.0.105
                    Use ``settings=ElevenLabsHttpTTSSettings(...)`` instead.

            settings: Runtime-updatable settings. When provided alongside deprecated
                parameters, ``settings`` values take precedence.
            text_aggregation_mode: How to aggregate incoming text before synthesis.
            aggregate_sentences: Whether to aggregate sentences within the TTSService.

                .. deprecated:: 0.0.104
                    Use ``text_aggregation_mode`` instead.

            **kwargs: Additional arguments passed to the parent service.
        """
        # 1. Initialize default_settings with hardcoded defaults
        default_settings = ElevenLabsHttpTTSSettings(
            model="eleven_turbo_v2_5",
            voice=None,
            language=None,
            optimize_streaming_latency=None,
            stability=None,
            similarity_boost=None,
            style=None,
            use_speaker_boost=None,
            speed=None,
            apply_text_normalization=None,
        )

        # 2. Apply direct init arg overrides (deprecated)
        if voice_id is not None:
            _warn_deprecated_param("voice_id", ElevenLabsHttpTTSSettings, "voice")
            default_settings.voice = voice_id
        if model is not None:
            _warn_deprecated_param("model", ElevenLabsHttpTTSSettings, "model")
            default_settings.model = model

        # 3. Apply params overrides — only if settings not provided
        _pronunciation_dictionary_locators = pronunciation_dictionary_locators
        if params is not None:
            _warn_deprecated_param("params", ElevenLabsHttpTTSSettings)
            if not settings:
                if params.language is not None:
                    default_settings.language = self.language_to_service_language(params.language)
                if params.optimize_streaming_latency is not None:
                    default_settings.optimize_streaming_latency = params.optimize_streaming_latency
                if params.stability is not None:
                    default_settings.stability = params.stability
                if params.similarity_boost is not None:
                    default_settings.similarity_boost = params.similarity_boost
                if params.style is not None:
                    default_settings.style = params.style
                if params.use_speaker_boost is not None:
                    default_settings.use_speaker_boost = params.use_speaker_boost
                if params.speed is not None:
                    default_settings.speed = params.speed
                if params.apply_text_normalization is not None:
                    default_settings.apply_text_normalization = params.apply_text_normalization
                if _pronunciation_dictionary_locators is None:
                    _pronunciation_dictionary_locators = params.pronunciation_dictionary_locators

        # 4. Apply settings delta (canonical API, always wins)
        if settings is not None:
            default_settings.apply_update(settings)

        super().__init__(
            text_aggregation_mode=text_aggregation_mode,
            aggregate_sentences=aggregate_sentences,
            push_text_frames=False,
            push_stop_frames=True,
            push_start_frame=True,
            sample_rate=sample_rate,
            settings=default_settings,
            **kwargs,
        )

        self._api_key = api_key
        self._base_url = base_url
        self._session = aiohttp_session

        self._output_format = ""  # initialized in start()
        self._voice_settings = self._set_voice_settings()
        self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators

        # Track cumulative time to properly sequence word timestamps across utterances
        self._cumulative_time = 0

        # Store previous text for context within a turn
        self._previous_text = ""

        # Track partial words that span across alignment chunks
        self._partial_word = ""
        self._partial_word_start_time = 0.0

    def language_to_service_language(self, language: Language) -> Optional[str]:
        """Convert pipecat Language to ElevenLabs language code.

        Args:
            language: The language to convert.

        Returns:
            The ElevenLabs-specific language code, or None if not supported.
        """
        return language_to_elevenlabs_language(language)

    def can_generate_metrics(self) -> bool:
        """Check if this service can generate processing metrics.

        Returns:
            True, as ElevenLabs HTTP service supports metrics generation.
        """
        return True

    def _set_voice_settings(self):
        return build_elevenlabs_voice_settings(self._settings)

    async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
        """Apply a settings delta and rebuild voice settings.

        Args:
            delta: A :class:`TTSSettings` (or ``ElevenLabsHttpTTSSettings``) delta.

        Returns:
            Dict mapping changed field names to their previous values.
        """
        changed = await super()._update_settings(delta)
        if changed:
            self._voice_settings = self._set_voice_settings()
        return changed

    def _reset_state(self):
        """Reset internal state variables."""
        self._cumulative_time = 0
        self._previous_text = ""
        self._partial_word = ""
        self._partial_word_start_time = 0.0
        logger.debug(f"{self}: Reset internal state")

    async def start(self, frame: StartFrame):
        """Start the ElevenLabs HTTP TTS service.

        Args:
            frame: The start frame containing initialization parameters.
        """
        await super().start(frame)
        self._output_format = output_format_from_sample_rate(self.sample_rate)
        self._reset_state()

    async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
        """Push a frame and handle state changes.

        Args:
            frame: The frame to push.
            direction: The direction to push the frame.
        """
        await super().push_frame(frame, direction)
        if isinstance(frame, (InterruptionFrame, TTSStoppedFrame)):
            # Reset timing on interruption or stop
            self._reset_state()

            if isinstance(frame, TTSStoppedFrame):
                await self.add_word_timestamps([("Reset", 0)])

        elif isinstance(frame, LLMFullResponseEndFrame):
            # End of turn - reset previous text
            self._previous_text = ""

    def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> List[Tuple[str, float]]:
        """Calculate word timing from character alignment data.

        This method handles partial words that may span across multiple alignment chunks.

        Args:
            alignment_info: Character timing data from ElevenLabs.

        Returns:
            List of (word, timestamp) pairs for complete words in this chunk.

        Example input data::

            {
                "characters": [" ", "H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"],
                "character_start_times_seconds": [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
                "character_end_times_seconds": [0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
            }

        Would produce word times (with cumulative_time=0)::

            [("Hello", 0.1), ("world", 0.5)]
        """
        chars = alignment_info.get("characters", [])
        char_start_times = alignment_info.get("character_start_times_seconds", [])

        if not chars or not char_start_times or len(chars) != len(char_start_times):
            logger.warning(
                f"Invalid alignment data: chars={len(chars)}, times={len(char_start_times)}"
            )
            return []

        # Build the words and find their start times
        words = []
        word_start_times = []
        # Start with any partial word from previous chunk
        current_word = self._partial_word
        word_start_time = self._partial_word_start_time if self._partial_word else None

        for i, char in enumerate(chars):
            if char == " ":
                if current_word:  # Only add non-empty words
                    words.append(current_word)
                    word_start_times.append(word_start_time)
                    current_word = ""
                    word_start_time = None
            else:
                if word_start_time is None:  # First character of a new word
                    # Use time of the first character of the word, offset by cumulative time
                    word_start_time = self._cumulative_time + char_start_times[i]
                current_word += char

        # Store any incomplete word at the end of this chunk
        self._partial_word = current_word if current_word else ""
        self._partial_word_start_time = word_start_time if word_start_time is not None else 0.0

        # Create word-time pairs for complete words only
        word_times = list(zip(words, word_start_times))

        return word_times

    @traced_tts
    async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]:
        """Generate speech from text using ElevenLabs streaming API with timestamps.

        Makes a request to the ElevenLabs API to generate audio and timing data.
        Tracks the duration of each utterance to ensure correct sequencing.
        Includes previous text as context for better prosody continuity.

        Args:
            text: Text to convert to speech.
            context_id: The context ID for tracking audio frames.

        Yields:
            Frame: Audio and control frames containing the synthesized speech.
        """
        logger.debug(f"{self}: Generating TTS [{text}]")

        # Use the with-timestamps endpoint
        url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps"

        payload: Dict[str, Union[str, Dict[str, Union[float, bool]]]] = {
            "text": text,
            "model_id": self._settings.model,
        }

        # Include previous text as context if available
        if self._previous_text:
            payload["previous_text"] = self._previous_text

        if self._voice_settings:
            payload["voice_settings"] = self._voice_settings

        if self._pronunciation_dictionary_locators:
            payload["pronunciation_dictionary_locators"] = [
                locator.model_dump() for locator in self._pronunciation_dictionary_locators
            ]

        if self._settings.apply_text_normalization is not None:
            payload["apply_text_normalization"] = self._settings.apply_text_normalization

        language = self._settings.language
        if self._settings.model in ELEVENLABS_MULTILINGUAL_MODELS and language:
            payload["language_code"] = language
            logger.debug(f"Using language code: {language}")
        elif language:
            logger.warning(
                f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
            )

        headers = {
            "xi-api-key": self._api_key,
            "Content-Type": "application/json",
        }

        # Build query parameters
        params = {
            "output_format": self._output_format,
        }
        if self._settings.optimize_streaming_latency is not None:
            params["optimize_streaming_latency"] = self._settings.optimize_streaming_latency

        try:
            async with self._session.post(
                url, json=payload, headers=headers, params=params
            ) as response:
                if response.status != 200:
                    error_text = await response.text()
                    yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
                    return

                await self.start_tts_usage_metrics(text)

                # Track the duration of this utterance based on the last character's end time
                utterance_duration = 0
                async for line in response.content:
                    line_str = line.decode("utf-8").strip()
                    if not line_str:
                        continue

                    try:
                        # Parse the JSON object
                        data = json.loads(line_str)

                        # Process audio if present
                        if data and "audio_base64" in data:
                            await self.stop_ttfb_metrics()
                            audio = base64.b64decode(data["audio_base64"])
                            yield TTSAudioRawFrame(
                                audio, self.sample_rate, 1, context_id=context_id
                            )

                        # Process alignment if present
                        if data and "alignment" in data:
                            alignment = data["alignment"]
                            if alignment:  # Ensure alignment is not None
                                # Get end time of the last character in this chunk
                                char_end_times = alignment.get("character_end_times_seconds", [])
                                if char_end_times:
                                    chunk_end_time = char_end_times[-1]
                                    # Update to the longest end time seen so far
                                    utterance_duration = max(utterance_duration, chunk_end_time)

                                # Calculate word timestamps
                                word_times = self.calculate_word_times(alignment)
                                if word_times:
                                    await self.add_word_timestamps(word_times, context_id)
                    except json.JSONDecodeError as e:
                        logger.warning(f"Failed to parse JSON from stream: {e}")
                        continue
                    except Exception as e:
                        yield ErrorFrame(error=f"Unknown error occurred: {e}")
                        continue

                # After processing all chunks, emit any remaining partial word
                # since this is the end of the utterance
                if self._partial_word:
                    final_word_time = [(self._partial_word, self._partial_word_start_time)]
                    await self.add_word_timestamps(final_word_time, context_id)
                    self._partial_word = ""
                    self._partial_word_start_time = 0.0

                # After processing all chunks, add the total utterance duration
                # to the cumulative time to ensure next utterance starts after this one
                if utterance_duration > 0:
                    self._cumulative_time += utterance_duration

                # Append the current text to previous_text for context continuity
                # Only add a space if there's already text
                if self._previous_text:
                    self._previous_text += " " + text
                else:
                    self._previous_text = text

        except Exception as e:
            yield ErrorFrame(error=f"Unknown error occurred: {e}")
        finally:
            await self.stop_ttfb_metrics()
